diff --git a/.gitignore b/.gitignore index 3df4c92..9153b8c 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,9 @@ coverage npm-debug.log yarn-error.log +# OS specific +.DS_Store + # Editors specific .fleet .idea diff --git a/README.md b/README.md index aafc6cf..3cee4fa 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ npm install @boringnode/queue - **Priority Queues**: Process high-priority jobs first - **Bulk Dispatch**: Efficiently dispatch thousands of jobs at once - **Job Grouping**: Organize related jobs for monitoring +- **Job Deduplication**: Prevent duplicate jobs with custom IDs - **Retry with Backoff**: Exponential, linear, or fixed backoff strategies - **Job Timeout**: Fail or retry jobs that exceed a time limit - **Job History**: Retain completed/failed jobs for debugging @@ -131,6 +132,80 @@ await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025') The `groupId` is stored with job data and accessible via `job.data.groupId`. +## Job Deduplication + +Prevent the same job from being pushed multiple times. Four modes, all via `.dedup()`: + +### Simple (skip while job exists) + +```typescript +// First dispatch - job is created +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() + +// Second dispatch with same dedup ID - silently skipped +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() +``` + +### Throttle (skip within TTL window) + +```typescript +// Within 5s, duplicates are skipped. After 5s, a new job is created. +await SendEmailJob.dispatch({ to: 'user@example.com' }) + .dedup({ id: 'welcome-123', ttl: '5s' }) + .run() +``` + +### Extend (reset TTL on duplicate) + +```typescript +// Each duplicate push resets the TTL timer. +await RateLimitJob.dispatch({ userId: 42 }).dedup({ id: 'rate-42', ttl: '1m', extend: true }).run() +``` + +### Debounce (replace payload + reset TTL) + +```typescript +// Within the 2s window, the latest payload overwrites the previous pending job. +await SaveDraftJob.dispatch({ content: 'latest draft' }) + .dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true }) + .run() +``` + +### Inspecting the outcome + +`DispatchResult` tells you what happened: + +```typescript +const { jobId, deduped } = await SaveDraftJob.dispatch({ content: '...' }) + .dedup({ id: 'draft-42', ttl: '2s', replace: true }) + .run() + +// deduped: 'added' | 'skipped' | 'replaced' | 'extended' +// jobId: the UUID of the job (the existing one when deduped) +``` + +### How it works + +- The dedup ID is automatically prefixed with the job name (`SendInvoiceJob::order-123`), so different job types can reuse the same key. +- `ttl` accepts a Duration (`'5s'`, `'1m'`) or milliseconds. +- `extend` and `replace` **require** `ttl` — calling them without `ttl` throws. +- `replace` only applies to jobs in `pending` or `delayed` state. Active (executing) jobs are left alone; the dispatch returns `{ deduped: 'skipped' }`. +- `replace` swaps the **payload only** — priority, queue, delay, and groupId of the existing job are retained. To change those, use a different dedup id or wait for the TTL to expire. +- `retryJob` does not touch the dedup entry — a retried job continues to occupy the dedup slot. TTL runs on wall-clock time, so long-running retries may outlive the TTL window. Use a generous TTL or no TTL if retries must stay deduped. +- Atomic and race-free: + - **Redis**: `SET + PEXPIRE` under a Lua script with `HSETNX`-style guards. + - **Knex**: transactional `SELECT ... FOR UPDATE` + insert/update inside a transaction. + - **SyncAdapter**: executes inline, no dedup support. + +### Caveats + +- Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. +- `.dedup()` is only available on single dispatch. `dispatchMany` / `pushManyOn` reject jobs with a `dedup` field. +- Scheduled jobs (`.schedule()`) do not support dedup — each cron/interval fire is an independent dispatch. +- With no `ttl`, dedup persists until the job is removed (completed/failed without retention). When retention keeps the record, re-dispatch stays blocked until the record is pruned. +- With `ttl`, dedup expires after the window — a new job (new UUID) is created. The old job still runs. +- Knex concurrent race: two `pushOn` calls with the same dedup id firing at the exact same instant on Postgres (READ COMMITTED) can both succeed (rare). Serialize at the app layer if strict guarantees are required, or use Redis. + ## Job History & Retention Keep completed and failed jobs for debugging: @@ -536,7 +611,7 @@ import * as boringqueue from '@boringnode/queue' const instrumentation = new QueueInstrumentation({ messagingSystem: 'boringqueue', // default - executionSpanLinkMode: 'link', // or 'parent' + executionSpanLinkMode: 'link', // or 'parent' }) instrumentation.enable() @@ -549,19 +624,19 @@ The instrumentation patches `QueueManager.init()` to automatically inject its wr The instrumentation uses standard [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) where they map cleanly, plus a few queue-specific custom attributes. -| Attribute | Kind | Description | -| ------------------------------- | ------- | ------------------------------------------ | -| `messaging.system` | Semconv | `'boringqueue'` (configurable) | -| `messaging.operation.name` | Semconv | `'publish'` or `'process'` | -| `messaging.destination.name` | Semconv | Queue name | -| `messaging.message.id` | Semconv | Job ID for single-message spans | -| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch | -| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt | -| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) | -| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` | -| `messaging.job.group_id` | Custom | Queue-specific group identifier | -| `messaging.job.priority` | Custom | Queue-specific job priority | -| `messaging.job.delay_ms` | Custom | Delay before the job becomes available | +| Attribute | Kind | Description | +| ------------------------------- | ------- | --------------------------------------------- | +| `messaging.system` | Semconv | `'boringqueue'` (configurable) | +| `messaging.operation.name` | Semconv | `'publish'` or `'process'` | +| `messaging.destination.name` | Semconv | Queue name | +| `messaging.message.id` | Semconv | Job ID for single-message spans | +| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch | +| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt | +| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) | +| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` | +| `messaging.job.group_id` | Custom | Queue-specific group identifier | +| `messaging.job.priority` | Custom | Queue-specific job priority | +| `messaging.job.delay_ms` | Custom | Delay before the job becomes available | | `messaging.job.queue_time_ms` | Custom | Time spent waiting in queue before processing | ### Trace Context Propagation diff --git a/src/contracts/adapter.ts b/src/contracts/adapter.ts index e05067e..8bd8e12 100644 --- a/src/contracts/adapter.ts +++ b/src/contracts/adapter.ts @@ -1,4 +1,5 @@ import type { + DedupOutcome, JobData, JobRecord, JobRetention, @@ -7,6 +8,17 @@ import type { ScheduleListOptions, } from '../types/main.js' +/** + * Result of a push operation when dedup was involved. + * `outcome` tells the dispatcher what happened; `jobId` is the ID of the + * existing job when deduped (skipped/replaced/extended). + */ +export interface PushResult { + outcome: DedupOutcome + /** ID of the existing job when a duplicate was detected, otherwise the newly added job's id. */ + jobId: string +} + /** * A job that has been acquired by a worker for processing. * Extends JobData with the timestamp when the job was acquired. @@ -119,24 +131,27 @@ export interface Adapter { * Push a job to the default queue for immediate processing. * * @param jobData - The job data to push + * @returns PushResult if jobData.dedup is set, otherwise void */ - push(jobData: JobData): Promise + push(jobData: JobData): Promise /** * Push a job to a specific queue for immediate processing. * * @param queue - The queue name to push to * @param jobData - The job data to push + * @returns PushResult if jobData.dedup is set, otherwise void */ - pushOn(queue: string, jobData: JobData): Promise + pushOn(queue: string, jobData: JobData): Promise /** * Push a job to the default queue with a delay. * * @param jobData - The job data to push * @param delay - Delay in milliseconds before the job becomes available + * @returns PushResult if jobData.dedup is set, otherwise void */ - pushLater(jobData: JobData, delay: number): Promise + pushLater(jobData: JobData, delay: number): Promise /** * Push a job to a specific queue with a delay. @@ -144,8 +159,9 @@ export interface Adapter { * @param queue - The queue name to push to * @param jobData - The job data to push * @param delay - Delay in milliseconds before the job becomes available + * @returns PushResult if jobData.dedup is set, otherwise void */ - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise + pushLaterOn(queue: string, jobData: JobData, delay: number): Promise /** * Push multiple jobs to the default queue for immediate processing. diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index ddc528a..3671cc4 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -2,7 +2,7 @@ import assert from 'node:assert/strict' import { randomUUID } from 'node:crypto' import { isDeepStrictEqual } from 'node:util' import { CronExpressionParser } from 'cron-parser' -import type { Adapter, AcquiredJob } from '../contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' import type { JobData, JobClass, @@ -16,6 +16,14 @@ import { DEFAULT_PRIORITY } from '../constants.js' import { parse } from '../utils.js' import { Job } from '../job.js' +interface DedupEntry { + jobId: string + createdAt: number + ttl?: number + replace?: boolean + extend?: boolean +} + interface ActiveJob { job: JobData acquiredAt: number @@ -71,6 +79,7 @@ export class FakeAdapter implements Adapter { #pendingTimeouts = new Set() #schedules = new Map() #pushedJobs: FakeJobRecord[] = [] + #dedupIndex = new Map>() #onDispose?: () => void /** @@ -116,6 +125,7 @@ export class FakeAdapter implements Adapter { this.#failedJobs.clear() this.#schedules.clear() this.#pushedJobs = [] + this.#dedupIndex.clear() } assertPushed(matcher: FakeJobMatcher, query?: FakeJobQuery): void { @@ -155,24 +165,36 @@ export class FakeAdapter implements Adapter { return jobs.length } - async push(jobData: JobData): Promise { + async push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { + const deduped = await this.#applyDedup(queue, jobData) + if (deduped) return deduped + this.#recordPush(queue, jobData) this.#enqueue(queue, jobData) + + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } - async pushLater(jobData: JobData, delay: number): Promise { + async pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + const deduped = await this.#applyDedup(queue, jobData) + if (deduped) return deduped + this.#recordPush(queue, jobData, delay) this.#schedulePush(queue, jobData, delay) - return Promise.resolve() + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } async pushMany(jobs: JobData[]): Promise { @@ -180,6 +202,10 @@ export class FakeAdapter implements Adapter { } async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + for (const job of jobs) { await this.pushOn(queue, job) } @@ -226,6 +252,7 @@ export class FakeAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnComplete === undefined || removeOnComplete === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -244,6 +271,7 @@ export class FakeAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnFail === undefined || removeOnFail === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -298,6 +326,7 @@ export class FakeAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - just remove from active this.#activeJobs.delete(jobId) + this.#cleanupDedupForJob(active.queue, active.job) continue } @@ -526,26 +555,131 @@ export class FakeAdapter implements Adapter { records.push(record) if (retention && retention !== true) { - this.#applyRetention(records, retention) + this.#applyRetention(records, retention, queue) } } - #applyRetention(records: JobRecord[], retention: JobRetention) { + #applyRetention(records: JobRecord[], retention: JobRetention, queue: string) { if (retention === false || retention === true) { return } + const pruned: JobRecord[] = [] + if (retention.age !== undefined) { const maxAgeMs = parse(retention.age) if (maxAgeMs > 0) { const cutoff = Date.now() - maxAgeMs - const filtered = records.filter((record) => (record.finishedAt ?? 0) >= cutoff) - records.splice(0, records.length, ...filtered) + const kept: JobRecord[] = [] + for (const record of records) { + if ((record.finishedAt ?? 0) >= cutoff) { + kept.push(record) + } else { + pruned.push(record) + } + } + records.splice(0, records.length, ...kept) } } if (retention.count !== undefined && retention.count > 0 && records.length > retention.count) { - records.splice(0, records.length - retention.count) + const excess = records.length - retention.count + pruned.push(...records.slice(0, excess)) + records.splice(0, excess) + } + + for (const record of pruned) { + this.#cleanupDedupForJob(queue, record.data) + } + } + + #applyDedup(queue: string, jobData: JobData): PushResult | null { + if (!jobData.dedup) return null + + const dedupId = jobData.dedup.id + const now = Date.now() + const entry = this.#getDedupEntry(queue, dedupId) + + if (entry) { + const withinTtl = !entry.ttl || now - entry.createdAt < entry.ttl + if (withinTtl) { + const existing = this.#findJobById(queue, entry.jobId) + if (existing) { + const replaceable = existing.location === 'pending' || existing.location === 'delayed' + if (jobData.dedup.replace && replaceable) { + existing.job.payload = structuredClone(jobData.payload) + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + } + return { outcome: 'replaced', jobId: entry.jobId } + } + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + return { outcome: 'extended', jobId: entry.jobId } + } + return { outcome: 'skipped', jobId: entry.jobId } + } + } + } + + this.#setDedupEntry(queue, dedupId, { + jobId: jobData.id, + createdAt: now, + ttl: jobData.dedup.ttl, + replace: jobData.dedup.replace, + extend: jobData.dedup.extend, + }) + + return null + } + + #findJobById( + queue: string, + jobId: string + ): { + job: JobData + location: 'pending' | 'delayed' | 'active' | 'completed' | 'failed' + } | null { + const active = this.#activeJobs.get(jobId) + if (active && active.queue === queue) { + return { job: active.job, location: 'active' } + } + const pending = this.#queues.get(queue)?.find((j) => j.id === jobId) + if (pending) { + return { job: pending, location: 'pending' } + } + const delayed = this.#delayedJobs.get(queue)?.get(jobId) + if (delayed) { + return { job: delayed.job, location: 'delayed' } + } + const completed = this.#findHistory(this.#completedJobs, queue, jobId) + if (completed) { + return { job: completed.data, location: 'completed' } + } + const failed = this.#findHistory(this.#failedJobs, queue, jobId) + if (failed) { + return { job: failed.data, location: 'failed' } + } + return null + } + + #getDedupEntry(queue: string, dedupId: string): DedupEntry | undefined { + return this.#dedupIndex.get(queue)?.get(dedupId) + } + + #setDedupEntry(queue: string, dedupId: string, entry: DedupEntry): void { + if (!this.#dedupIndex.has(queue)) { + this.#dedupIndex.set(queue, new Map()) + } + this.#dedupIndex.get(queue)!.set(dedupId, entry) + } + + #cleanupDedupForJob(queue: string, job: JobData): void { + if (!job.dedup) return + const entry = this.#getDedupEntry(queue, job.dedup.id) + // Only delete if the entry points to THIS job (replace may have re-pointed it elsewhere) + if (entry && entry.jobId === job.id) { + this.#dedupIndex.get(queue)?.delete(job.dedup.id) } } diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index 56bcd34..607becc 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -1,8 +1,9 @@ import { randomUUID } from 'node:crypto' import KnexPkg from 'knex' import type { Knex } from 'knex' -import type { Adapter, AcquiredJob } from '../contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' import type { + DedupOutcome, JobData, JobRecord, JobRetention, @@ -117,9 +118,7 @@ export class KnexAdapter implements Adapter { // Update job to active status // For SQLite (no SKIP LOCKED), add status='pending' guard to prevent double-claim - const updateQuery = trx(this.#jobsTable) - .where('id', job.id) - .where('queue', queue) + const updateQuery = trx(this.#jobsTable).where('id', job.id).where('queue', queue) if (!this.#supportsSkipLocked()) { updateQuery.where('status', 'pending') @@ -178,14 +177,11 @@ export class KnexAdapter implements Adapter { const priority = jobData.priority ?? DEFAULT_PRIORITY const score = calculateScore(priority, now) - await trx(this.#jobsTable) - .where('id', job.id) - .where('queue', queue) - .update({ - status: 'pending', - score, - execute_at: null, - }) + await trx(this.#jobsTable).where('id', job.id).where('queue', queue).update({ + status: 'pending', + score, + execute_at: null, + }) } }) } @@ -331,45 +327,49 @@ export class KnexAdapter implements Adapter { if (retryAt && retryAt.getTime() > now) { // Move to delayed - await this.#connection(this.#jobsTable) - .where('id', jobId) - .where('queue', queue) - .update({ - status: 'delayed', - data: updatedData, - worker_id: null, - acquired_at: null, - score: null, - execute_at: retryAt.getTime(), - }) + await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({ + status: 'delayed', + data: updatedData, + worker_id: null, + acquired_at: null, + score: null, + execute_at: retryAt.getTime(), + }) } else { // Move back to pending const priority = jobData.priority ?? DEFAULT_PRIORITY const score = calculateScore(priority, now) - await this.#connection(this.#jobsTable) - .where('id', jobId) - .where('queue', queue) - .update({ - status: 'pending', - data: updatedData, - worker_id: null, - acquired_at: null, - score, - execute_at: null, - }) + await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({ + status: 'pending', + data: updatedData, + worker_id: null, + acquired_at: null, + score, + execute_at: null, + }) } } - async push(jobData: JobData): Promise { + async push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { const priority = jobData.priority ?? DEFAULT_PRIORITY const timestamp = Date.now() const score = calculateScore(priority, timestamp) + if (jobData.dedup) { + return this.#pushWithDedup(queue, jobData, { + id: jobData.id, + queue, + status: 'pending', + data: JSON.stringify(jobData), + score, + }) + } + await this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, @@ -379,13 +379,23 @@ export class KnexAdapter implements Adapter { }) } - async pushLater(jobData: JobData, delay: number): Promise { + async pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { const executeAt = Date.now() + delay + if (jobData.dedup) { + return this.#pushWithDedup(queue, jobData, { + id: jobData.id, + queue, + status: 'delayed', + data: JSON.stringify(jobData), + execute_at: executeAt, + }) + } + await this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, @@ -395,6 +405,97 @@ export class KnexAdapter implements Adapter { }) } + async #pushWithDedup( + queue: string, + jobData: JobData, + insertRow: Record + ): Promise { + const dedup = jobData.dedup! + + try { + return await this.#pushWithDedupTxn(queue, jobData, insertRow, dedup) + } catch (err) { + if (this.#isMissingDedupColumn(err)) { + throw new Error( + `Dedup columns missing on "${this.#jobsTable}". Run QueueSchemaService.addDedupColumns() on your jobs table before dispatching jobs with .dedup().`, + { cause: err } + ) + } + throw err + } + } + + #isMissingDedupColumn(err: unknown): boolean { + if (!err || typeof err !== 'object') return false + const message = (err as { message?: string }).message + if (!message) return false + // Postgres: 'column "dedup_id" does not exist' + // SQLite: 'no such column: dedup_id' + // MySQL: "Unknown column 'dedup_id' in 'where clause'" + return ( + /dedup_id/.test(message) && /(does not exist|no such column|Unknown column)/i.test(message) + ) + } + + #pushWithDedupTxn( + queue: string, + jobData: JobData, + insertRow: Record, + dedup: NonNullable + ): Promise { + return this.#connection.transaction(async (trx) => { + const existing = await trx(this.#jobsTable) + .where('queue', queue) + .where('dedup_id', dedup.id) + .orderBy('dedup_at', 'desc') + .forUpdate() + .first() + + const now = Date.now() + + if (existing) { + const dedupAt = existing.dedup_at != null ? Number(existing.dedup_at) : null + const dedupTtl = existing.dedup_ttl != null ? Number(existing.dedup_ttl) : null + const withinTtl = dedupTtl === null || (dedupAt !== null && now - dedupAt < dedupTtl) + + if (withinTtl) { + const status = existing.status as JobStatus + const replaceable = status === 'pending' || status === 'delayed' + + if (dedup.replace && replaceable) { + const storedData = + typeof existing.data === 'string' ? JSON.parse(existing.data) : existing.data + const newData = { ...storedData, payload: jobData.payload } + const updates: Record = { data: JSON.stringify(newData) } + if (dedup.extend && dedupTtl) { + updates.dedup_at = now + } + await trx(this.#jobsTable).where({ id: existing.id, queue }).update(updates) + return { outcome: 'replaced' as DedupOutcome, jobId: existing.id as string } + } + + if (dedup.extend && dedupTtl) { + await trx(this.#jobsTable).where({ id: existing.id, queue }).update({ dedup_at: now }) + return { outcome: 'extended' as DedupOutcome, jobId: existing.id as string } + } + + return { outcome: 'skipped' as DedupOutcome, jobId: existing.id as string } + } + // TTL expired — fall through to insert a new row. Old row (if retained + // history) is left intact. + } + + await trx(this.#jobsTable).insert({ + ...insertRow, + dedup_id: dedup.id, + dedup_at: now, + dedup_ttl: dedup.ttl ?? null, + }) + + return { outcome: 'added' as DedupOutcome, jobId: jobData.id } + }) + } + async pushMany(jobs: JobData[]): Promise { return this.pushManyOn('default', jobs) } @@ -402,6 +503,10 @@ export class KnexAdapter implements Adapter { async pushManyOn(queue: string, jobs: JobData[]): Promise { if (jobs.length === 0) return + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + const now = Date.now() const rows = jobs.map((job) => ({ id: job.id, @@ -458,10 +563,7 @@ export class KnexAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - remove the job - await trx(this.#jobsTable) - .where('id', row.id) - .where('queue', queue) - .delete() + await trx(this.#jobsTable).where('id', row.id).where('queue', queue).delete() } else { // Recover: increment stalledCount and put back in pending jobData.stalledCount = currentStalledCount + 1 @@ -534,9 +636,9 @@ export class KnexAdapter implements Adapter { } async getSchedule(id: string): Promise { - const row = (await this.#connection(this.#schedulesTable) - .where('id', id) - .first()) as ScheduleRow | undefined + const row = (await this.#connection(this.#schedulesTable).where('id', id).first()) as + | ScheduleRow + | undefined if (!row) return null return this.#rowToScheduleData(row) @@ -565,16 +667,12 @@ export class KnexAdapter implements Adapter { if (updates.runCount !== undefined) data.run_count = updates.runCount if (Object.keys(data).length > 0) { - await this.#connection(this.#schedulesTable) - .where('id', id) - .update(data) + await this.#connection(this.#schedulesTable).where('id', id).update(data) } } async deleteSchedule(id: string): Promise { - await this.#connection(this.#schedulesTable) - .where('id', id) - .delete() + await this.#connection(this.#schedulesTable).where('id', id).delete() } async claimDueSchedule(): Promise { @@ -629,13 +727,11 @@ export class KnexAdapter implements Adapter { } // Update atomically - await trx(this.#schedulesTable) - .where('id', row.id) - .update({ - next_run_at: nextRunAt, - last_run_at: now, - run_count: newRunCount, - }) + await trx(this.#schedulesTable).where('id', row.id).update({ + next_run_at: nextRunAt, + last_run_at: now, + run_count: newRunCount, + }) // Return schedule data (before update state for payload) return this.#rowToScheduleData(row) diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index bb66c05..90e4a2b 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -2,7 +2,8 @@ import { randomUUID } from 'node:crypto' import { Redis, type RedisOptions } from 'ioredis' import { DEFAULT_PRIORITY } from '../constants.js' import { calculateScore } from '../utils.js' -import type { Adapter, AcquiredJob } from '../contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' +import type { DedupOutcome } from '../types/main.js' import type { JobData, JobRecord, @@ -35,6 +36,59 @@ const PUSH_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a dedup job. + * + * Behavior: + * - If dedup key exists AND job still exists AND within TTL: apply replace/extend, skip insert. + * - If dedup key exists but job data missing (orphan): proceed to insert new. + * - If TTL expired or no prior entry: insert new job, record dedup key with TTL. + * + * Replace only applies to non-active (pending/delayed) jobs. Active jobs are skipped. + * + * Returns {outcome, job_id}: outcome ∈ 'added' | 'skipped' | 'replaced' | 'extended'. + */ +const PUSH_DEDUP_JOB_SCRIPT = ` + local data_key = KEYS[1] + local pending_key = KEYS[2] + local dedup_key = KEYS[3] + local active_key = KEYS[4] + local job_id = ARGV[1] + local job_data = ARGV[2] + local score = tonumber(ARGV[3]) + local ttl = tonumber(ARGV[4]) + local extend = tonumber(ARGV[5]) + local replace = tonumber(ARGV[6]) + + local existing = redis.call('GET', dedup_key) + if existing then + local has_data = redis.call('HEXISTS', data_key, existing) == 1 + if has_data then + local is_active = redis.call('HEXISTS', active_key, existing) == 1 + if replace == 1 and not is_active then + redis.call('HSET', data_key, existing, job_data) + if extend == 1 and ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'replaced', existing} + end + if extend == 1 and ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + return {'extended', existing} + end + return {'skipped', existing} + end + end + + redis.call('HSET', data_key, job_id, job_data) + redis.call('ZADD', pending_key, score, job_id) + redis.call('SET', dedup_key, job_id) + if ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'added', job_id} +` + /** * Lua script for pushing a delayed job. * Stores job data in the central hash and adds jobId to delayed ZSET. @@ -52,6 +106,51 @@ const PUSH_DELAYED_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a dedup delayed job. + * Same semantics as PUSH_DEDUP_JOB_SCRIPT but adds to delayed ZSET. + */ +const PUSH_DEDUP_DELAYED_JOB_SCRIPT = ` + local data_key = KEYS[1] + local delayed_key = KEYS[2] + local dedup_key = KEYS[3] + local active_key = KEYS[4] + local job_id = ARGV[1] + local job_data = ARGV[2] + local execute_at = tonumber(ARGV[3]) + local ttl = tonumber(ARGV[4]) + local extend = tonumber(ARGV[5]) + local replace = tonumber(ARGV[6]) + + local existing = redis.call('GET', dedup_key) + if existing then + local has_data = redis.call('HEXISTS', data_key, existing) == 1 + if has_data then + local is_active = redis.call('HEXISTS', active_key, existing) == 1 + if replace == 1 and not is_active then + redis.call('HSET', data_key, existing, job_data) + if extend == 1 and ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'replaced', existing} + end + if extend == 1 and ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + return {'extended', existing} + end + return {'skipped', existing} + end + end + + redis.call('HSET', data_key, job_id, job_data) + redis.call('ZADD', delayed_key, execute_at, job_id) + redis.call('SET', dedup_key, job_id) + if ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'added', job_id} +` + /** * Lua script for atomic job acquisition. * 1. Check and process delayed jobs @@ -110,16 +209,27 @@ const ACQUIRE_JOB_SCRIPT = ` /** * Lua script for removing a job completely (no history). + * Also cleans up the dedup key if the job had dedup metadata. */ const REMOVE_JOB_SCRIPT = ` local data_key = KEYS[1] local active_key = KEYS[2] local job_id = ARGV[1] + local dedup_prefix = ARGV[2] if redis.call('HEXISTS', active_key, job_id) == 0 then return 0 end + -- Read job data to extract dedup.id before deleting + local job_data = redis.call('HGET', data_key, job_id) + if job_data then + local ok, job = pcall(cjson.decode, job_data) + if ok and job and job.dedup and job.dedup.id then + redis.call('DEL', dedup_prefix .. job.dedup.id) + end + end + redis.call('HDEL', active_key, job_id) redis.call('HDEL', data_key, job_id) @@ -129,6 +239,7 @@ const REMOVE_JOB_SCRIPT = ` /** * Lua script for finalizing a job in history. * Removes from active, stores finalization info, and prunes old records. + * When pruning removes job data, also deletes the associated dedup key. */ const FINALIZE_JOB_SCRIPT = ` local data_key = KEYS[1] @@ -140,6 +251,7 @@ const FINALIZE_JOB_SCRIPT = ` local max_age = tonumber(ARGV[3]) local max_count = tonumber(ARGV[4]) local error_message = ARGV[5] + local dedup_prefix = ARGV[6] -- Verify job is active if redis.call('HEXISTS', active_key, job_id) == 0 then @@ -159,11 +271,24 @@ const FINALIZE_JOB_SCRIPT = ` redis.call('HSET', history_key, job_id, cjson.encode(record)) redis.call('ZADD', index_key, now, job_id) + local function delete_dedup_for(ids) + for i = 1, #ids do + local d = redis.call('HGET', data_key, ids[i]) + if d then + local ok, job = pcall(cjson.decode, d) + if ok and job and job.dedup and job.dedup.id then + redis.call('DEL', dedup_prefix .. job.dedup.id) + end + end + end + end + -- Prune by age if max_age and max_age > 0 then local cutoff = now - max_age local expired = redis.call('ZRANGEBYSCORE', index_key, 0, cutoff) if #expired > 0 then + delete_dedup_for(expired) redis.call('ZREM', index_key, unpack(expired)) redis.call('HDEL', history_key, unpack(expired)) redis.call('HDEL', data_key, unpack(expired)) @@ -177,6 +302,7 @@ const FINALIZE_JOB_SCRIPT = ` local excess = size - max_count local stale = redis.call('ZRANGE', index_key, 0, excess - 1) if #stale > 0 then + delete_dedup_for(stale) redis.call('ZREM', index_key, unpack(stale)) redis.call('HDEL', history_key, unpack(stale)) redis.call('HDEL', data_key, unpack(stale)) @@ -250,6 +376,7 @@ const RECOVER_STALLED_JOBS_SCRIPT = ` local now = tonumber(ARGV[1]) local stalled_threshold = tonumber(ARGV[2]) local max_stalled_count = tonumber(ARGV[3]) + local dedup_prefix = ARGV[4] local recovered = 0 local stalled_cutoff = now - stalled_threshold @@ -275,7 +402,10 @@ const RECOVER_STALLED_JOBS_SCRIPT = ` -- Check if job has exceeded max stalled count if current_stalled_count >= max_stalled_count then - -- Job failed permanently, remove data too + -- Job failed permanently, remove data + dedup key + if job.dedup and job.dedup.id then + redis.call('DEL', dedup_prefix .. job.dedup.id) + end redis.call('HDEL', data_key, job_id) else -- Recover: increment stalledCount and put back in pending @@ -480,6 +610,14 @@ export class RedisAdapter implements Adapter { } } + #getDedupKey(queue: string, dedupId: string): string { + return `${this.#getDedupPrefix(queue)}${dedupId}` + } + + #getDedupPrefix(queue: string): string { + return `${redisKey}::${queue}::dedup::` + } + setWorkerId(workerId: string): void { this.#workerId = workerId } @@ -518,10 +656,11 @@ export class RedisAdapter implements Adapter { async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise { const keys = this.#getKeys(queue) + const dedupPrefix = this.#getDedupPrefix(queue) const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete) if (!keep) { - await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId) + await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId, dedupPrefix) return } @@ -536,7 +675,8 @@ export class RedisAdapter implements Adapter { Date.now().toString(), maxAge.toString(), maxCount.toString(), - '' + '', + dedupPrefix ) } @@ -547,10 +687,11 @@ export class RedisAdapter implements Adapter { removeOnFail?: JobRetention ): Promise { const keys = this.#getKeys(queue) + const dedupPrefix = this.#getDedupPrefix(queue) const { keep, maxAge, maxCount } = resolveRetention(removeOnFail) if (!keep) { - await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId) + await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId, dedupPrefix) return } @@ -565,7 +706,8 @@ export class RedisAdapter implements Adapter { Date.now().toString(), maxAge.toString(), maxCount.toString(), - error?.message || '' + error?.message || '', + dedupPrefix ) } @@ -608,18 +750,37 @@ export class RedisAdapter implements Adapter { return JSON.parse(result as string) } - push(jobData: JobData): Promise { + push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - pushLater(jobData: JobData, delay: number): Promise { + pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { const keys = this.#getKeys(queue) const executeAt = Date.now() + delay + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) + const result = (await this.#connection.eval( + PUSH_DEDUP_DELAYED_JOB_SCRIPT, + 4, + keys.data, + keys.delayed, + dedupKey, + keys.active, + jobData.id, + JSON.stringify(jobData), + executeAt.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0' + )) as [string, string] + return { outcome: result[0] as DedupOutcome, jobId: result[1] } + } + await this.#connection.eval( PUSH_DELAYED_JOB_SCRIPT, 2, @@ -631,12 +792,31 @@ export class RedisAdapter implements Adapter { ) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { const keys = this.#getKeys(queue) const priority = jobData.priority ?? DEFAULT_PRIORITY const timestamp = Date.now() const score = calculateScore(priority, timestamp) + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) + const result = (await this.#connection.eval( + PUSH_DEDUP_JOB_SCRIPT, + 4, + keys.data, + keys.pending, + dedupKey, + keys.active, + jobData.id, + JSON.stringify(jobData), + score.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0' + )) as [string, string] + return { outcome: result[0] as DedupOutcome, jobId: result[1] } + } + await this.#connection.eval( PUSH_JOB_SCRIPT, 2, @@ -655,6 +835,10 @@ export class RedisAdapter implements Adapter { async pushManyOn(queue: string, jobs: JobData[]): Promise { if (jobs.length === 0) return + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + const keys = this.#getKeys(queue) const now = Date.now() const multi = this.#connection.multi() @@ -694,7 +878,8 @@ export class RedisAdapter implements Adapter { keys.pending, now.toString(), stalledThreshold.toString(), - maxStalledCount.toString() + maxStalledCount.toString(), + this.#getDedupPrefix(queue) ) return recovered as number diff --git a/src/drivers/sync_adapter.ts b/src/drivers/sync_adapter.ts index 796db00..d97fa55 100644 --- a/src/drivers/sync_adapter.ts +++ b/src/drivers/sync_adapter.ts @@ -59,6 +59,10 @@ export class SyncAdapter implements Adapter { } async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + for (const job of jobs) { await this.pushOn(queue, job) } diff --git a/src/job_batch_dispatcher.ts b/src/job_batch_dispatcher.ts index 61864af..35452aa 100644 --- a/src/job_batch_dispatcher.ts +++ b/src/job_batch_dispatcher.ts @@ -165,7 +165,6 @@ export class JobBatchDispatcher { const message: JobDispatchMessage = { jobs, queue: this.#queue } - await dispatchChannel.tracePromise(async () => { await wrapInternal(() => adapter.pushManyOn(this.#queue, jobs)) }, message) diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index 9198895..d860f19 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -15,11 +15,12 @@ import { parse } from './utils.js' * * ``` * Job.dispatch(payload) - * .toQueue('emails') // optional: target queue - * .priority(1) // optional: 1-10, lower = higher priority - * .in('5m') // optional: delay before processing - * .with('redis') // optional: specific adapter - * .run() // dispatch the job + * .toQueue('emails') // optional: target queue + * .priority(1) // optional: 1-10, lower = higher priority + * .in('5m') // optional: delay before processing + * .dedup({ id: 'order-123' }) // optional: deduplication + * .with('redis') // optional: specific adapter + * .run() // dispatch the job * ``` * * @typeParam T - The payload type for this job @@ -47,6 +48,12 @@ export class JobDispatcher { #delay?: Duration #priority?: number #groupId?: string + #dedup?: { + id: string + ttl?: number + extend?: boolean + replace?: boolean + } /** * Create a new job dispatcher. @@ -148,6 +155,70 @@ export class JobDispatcher { return this } + /** + * Configure deduplication for this job. + * + * Modes: + * - **Simple** (`{ id }`): skip duplicates while the job exists. + * - **Throttle** (`{ id, ttl }`): skip duplicates within a TTL window. + * - **Debounce** (`{ id, ttl, replace: true }`): replace payload of the existing + * non-active job on duplicate within TTL. + * - **Extend** (`{ id, ttl, extend: true }`): reset the TTL window on each duplicate. + * + * The id is automatically prefixed with the job name to prevent collisions + * between different job types. + * + * @param options.id - Unique deduplication key + * @param options.ttl - TTL as Duration ('5s', 5000). Required for extend/replace. + * @param options.extend - Reset TTL on duplicate within window. + * @param options.replace - Replace payload of existing non-active job within window. + * + * @example + * ```typescript + * // Simple dedup + * await SendInvoiceJob.dispatch({ orderId: 123 }) + * .dedup({ id: 'order-123' }) + * + * // Throttle: 5 second window + * await SendEmailJob.dispatch({ to: 'x' }) + * .dedup({ id: 'welcome', ttl: '5s' }) + * + * // Debounce: replace payload within window + * await SaveDraftJob.dispatch({ content: 'latest' }) + * .dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true }) + * ``` + */ + dedup(options: { id: string; ttl?: Duration; extend?: boolean; replace?: boolean }): this { + if (!options.id) { + throw new Error('Dedup ID must be a non-empty string') + } + + if (options.id.length > 400) { + throw new Error('Dedup ID must be 400 characters or less') + } + + if ((options.extend || options.replace) && options.ttl === undefined) { + throw new Error('dedup.ttl is required when extend or replace is set') + } + + let parsedTtl: number | undefined + if (options.ttl !== undefined) { + parsedTtl = parse(options.ttl) + if (!Number.isFinite(parsedTtl) || parsedTtl < 0) { + throw new Error('dedup.ttl must be a non-negative duration') + } + } + + this.#dedup = { + id: options.id, + ttl: parsedTtl, + extend: options.extend, + replace: options.replace, + } + + return this + } + /** * Use a specific adapter for this job. * @@ -182,6 +253,7 @@ export class JobDispatcher { */ async run(): Promise { const id = randomUUID() + const dedupId = this.#dedup ? `${this.#name}::${this.#dedup.id}` : undefined debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload) @@ -197,18 +269,40 @@ export class JobDispatcher { priority: this.#priority, groupId: this.#groupId, createdAt: Date.now(), + ...(dedupId + ? { + dedup: { + id: dedupId, + ttl: this.#dedup!.ttl, + extend: this.#dedup!.extend, + replace: this.#dedup!.replace, + }, + } + : {}), } const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay } + let pushResult: { outcome: DispatchResult['deduped']; jobId: string } | undefined await dispatchChannel.tracePromise(async () => { - if (parsedDelay !== undefined) { - await wrapInternal(() => adapter.pushLaterOn(this.#queue, jobData, parsedDelay)) - } else { - await wrapInternal(() => adapter.pushOn(this.#queue, jobData)) + const result = + parsedDelay !== undefined + ? await wrapInternal(() => adapter.pushLaterOn(this.#queue, jobData, parsedDelay)) + : await wrapInternal(() => adapter.pushOn(this.#queue, jobData)) + + if (result && typeof result === 'object' && 'outcome' in result) { + pushResult = { outcome: result.outcome, jobId: result.jobId } + message.dedupOutcome = result.outcome } }, message) + if (pushResult && this.#dedup) { + return { + jobId: pushResult.jobId, + deduped: pushResult.outcome, + } + } + return { jobId: id } } diff --git a/src/queue_config_resolver.ts b/src/queue_config_resolver.ts index 1679f85..02e863b 100644 --- a/src/queue_config_resolver.ts +++ b/src/queue_config_resolver.ts @@ -102,10 +102,7 @@ export class QueueConfigResolver { * merge like `retry.maxRetries`. */ #normalizeJobRetryConfig(jobOptions?: JobOptions): RetryConfig | undefined { - if ( - !jobOptions || - (jobOptions.retry === undefined && jobOptions.maxRetries === undefined) - ) { + if (!jobOptions || (jobOptions.retry === undefined && jobOptions.maxRetries === undefined)) { return undefined } diff --git a/src/queue_manager.ts b/src/queue_manager.ts index 76d4377..1ff8f23 100644 --- a/src/queue_manager.ts +++ b/src/queue_manager.ts @@ -7,7 +7,9 @@ import { QueueConfigResolver } from './queue_config_resolver.js' import type { Adapter } from './contracts/adapter.js' import type { AdapterFactory, JobFactory, QueueManagerConfig } from './types/main.js' -const noopInternalOperationWrapper: NonNullable = async (fn) => fn() +const noopInternalOperationWrapper: NonNullable< + QueueManagerConfig['internalOperationWrapper'] +> = async (fn) => fn() const noopExecutionWrapper: NonNullable = async (fn) => fn() type QueueManagerFakeState = { diff --git a/src/services/queue_schema.ts b/src/services/queue_schema.ts index ed53eef..addf3c7 100644 --- a/src/services/queue_schema.ts +++ b/src/services/queue_schema.ts @@ -26,15 +26,46 @@ export class QueueSchemaService { table.bigint('execute_at').unsigned().nullable() table.bigint('finished_at').unsigned().nullable() table.text('error').nullable() + table.string('dedup_id', 510).nullable() + table.bigint('dedup_at').unsigned().nullable() + table.bigint('dedup_ttl').unsigned().nullable() table.primary(['id', 'queue']) table.index(['queue', 'status', 'score']) table.index(['queue', 'status', 'execute_at']) table.index(['queue', 'status', 'finished_at']) + table.index(['queue', 'dedup_id']) extend?.(table) }) } + /** + * Idempotent migration: adds dedup columns (dedup_id, dedup_at, dedup_ttl) + * and a (queue, dedup_id) index to an existing jobs table. + * + * Safe to run multiple times. Uses hasColumn checks so it won't fail on re-runs. + * For large Postgres tables, consider pausing workers during the run. + */ + async addDedupColumns(tableName: string = 'queue_jobs'): Promise { + const hasDedupId = await this.#connection.schema.hasColumn(tableName, 'dedup_id') + const hasDedupAt = await this.#connection.schema.hasColumn(tableName, 'dedup_at') + const hasDedupTtl = await this.#connection.schema.hasColumn(tableName, 'dedup_ttl') + + if (!hasDedupId || !hasDedupAt || !hasDedupTtl) { + await this.#connection.schema.alterTable(tableName, (table) => { + if (!hasDedupId) table.string('dedup_id', 510).nullable() + if (!hasDedupAt) table.bigint('dedup_at').unsigned().nullable() + if (!hasDedupTtl) table.bigint('dedup_ttl').unsigned().nullable() + }) + } + + if (!hasDedupId) { + await this.#connection.schema.alterTable(tableName, (table) => { + table.index(['queue', 'dedup_id']) + }) + } + } + /** * Creates the schedules table with the default schema. * The optional callback allows adding custom columns. @@ -57,10 +88,7 @@ export class QueueSchemaService { table.integer('run_count').unsigned().notNullable().defaultTo(0) table.timestamp('next_run_at').nullable() table.timestamp('last_run_at').nullable() - table - .timestamp('created_at') - .notNullable() - .defaultTo(this.#connection.fn.now()) + table.timestamp('created_at').notNullable().defaultTo(this.#connection.fn.now()) table.index(['status', 'next_run_at']) extend?.(table) diff --git a/src/types/main.ts b/src/types/main.ts index 4bc68c9..a4ae51c 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -43,9 +43,20 @@ export type JobStatus = 'pending' | 'active' | 'delayed' | 'completed' | 'failed * console.log(`Dispatched job: ${jobId}`) * ``` */ +/** + * Outcome of a dedup-enabled dispatch. + * - `added`: new job was inserted + * - `skipped`: duplicate found within TTL, skipped silently + * - `replaced`: duplicate found within TTL, existing job's payload was replaced + * - `extended`: duplicate found within TTL, TTL window was reset + */ +export type DedupOutcome = 'added' | 'skipped' | 'replaced' | 'extended' + export interface DispatchResult { /** Unique identifier for this specific job instance */ jobId: string + /** Dedup outcome (only present when `.dedup()` was used). */ + deduped?: DedupOutcome } /** @@ -133,6 +144,28 @@ export interface JobData { * Injected by OTel plugin at dispatch time. */ traceContext?: Record + + /** + * Deduplication configuration for this job. + * When set, adapters apply dedup semantics keyed on `dedup.id`. + * Set automatically when `.dedup()` is called on the dispatcher. + */ + dedup?: { + /** Dedup key, prefixed with the job name (e.g. `SendInvoiceJob::order-123`). */ + id: string + /** + * TTL in milliseconds. When set, dedup lock auto-expires after TTL. + * After expiry, the same dedup id produces a brand-new job (coexists with prior). + * 0 or undefined = no TTL; dedup lock persists until the job is removed. + */ + ttl?: number + /** Reset the TTL window when a duplicate arrives within it. */ + extend?: boolean + /** Replace payload of the existing non-active job when a duplicate arrives within TTL. */ + replace?: boolean + /** Timestamp (ms) when this dedup entry was recorded. Set by adapters. */ + createdAt?: number + } } /** diff --git a/src/types/tracing_channels.ts b/src/types/tracing_channels.ts index e2507d8..77d2057 100644 --- a/src/types/tracing_channels.ts +++ b/src/types/tracing_channels.ts @@ -6,7 +6,7 @@ */ import type { AcquiredJob } from '../contracts/adapter.js' -import type { JobData } from './main.js' +import type { DedupOutcome, JobData } from './main.js' /** * Tracing data structure for job dispatch events. @@ -21,6 +21,13 @@ export type JobDispatchMessage = { /** Delay in milliseconds before the job becomes available */ delay?: number + /** + * Deduplication outcome when the job used `.dedup()`. Allows OTel/tracing + * consumers to distinguish added vs skipped/replaced/extended dispatches. + * Populated by the dispatcher after the push call completes. + */ + dedupOutcome?: DedupOutcome + /** Error that caused the dispatch to fail */ error?: Error } diff --git a/src/worker.ts b/src/worker.ts index 5316d00..83cec94 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -7,7 +7,13 @@ import { JobPool } from './job_pool.js' import { JobExecutionRuntime } from './job_runtime.js' import { dispatchChannel, executeChannel } from './tracing_channels.js' import type { Adapter, AcquiredJob } from './contracts/adapter.js' -import type { JobContext, JobOptions, JobRetention, QueueManagerConfig, WorkerCycle } from './types/main.js' +import type { + JobContext, + JobOptions, + JobRetention, + QueueManagerConfig, + WorkerCycle, +} from './types/main.js' import type { JobDispatchMessage, JobExecuteMessage } from './types/tracing_channels.js' import { Locator } from './locator.js' import { DEFAULT_PRIORITY } from './constants.js' @@ -347,11 +353,26 @@ export class Worker { return executeChannel.tracePromise(async () => { try { await runtime.execute(instance, payload, context) - await this.#wrapInternal(() => this.#adapter.completeJob(job.id, queue, retention.removeOnComplete)) + await this.#wrapInternal(() => + this.#adapter.completeJob(job.id, queue, retention.removeOnComplete) + ) executeMessage.status = 'completed' - debug('worker %s: successfully executed job %s in %dms', this.#id, job.id, (performance.now() - startTime).toFixed(2)) + debug( + 'worker %s: successfully executed job %s in %dms', + this.#id, + job.id, + (performance.now() - startTime).toFixed(2) + ) } catch (e) { - await this.#handleExecutionFailure({ error: e as Error, job, queue, instance, runtime, retention, executeMessage }) + await this.#handleExecutionFailure({ + error: e as Error, + job, + queue, + instance, + runtime, + retention, + executeMessage, + }) } executeMessage.duration = Number((performance.now() - startTime).toFixed(2)) @@ -377,7 +398,12 @@ export class Worker { if (outcome.type === 'failed') { options.executeMessage.status = 'failed' await this.#wrapInternal(() => - this.#adapter.failJob(options.job.id, options.queue, outcome.storageError, options.retention.removeOnFail) + this.#adapter.failJob( + options.job.id, + options.queue, + outcome.storageError, + options.retention.removeOnFail + ) ) await options.instance.failed?.(outcome.hookError) return @@ -389,8 +415,15 @@ export class Worker { options.executeMessage.nextRetryAt = outcome.retryAt if (outcome.retryAt) { - debug('worker %s: job %s will retry at %s', this.#id, options.job.id, outcome.retryAt.toISOString()) - await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue, outcome.retryAt)) + debug( + 'worker %s: job %s will retry at %s', + this.#id, + options.job.id, + outcome.retryAt.toISOString() + ) + await this.#wrapInternal(() => + this.#adapter.retryJob(options.job.id, options.queue, outcome.retryAt) + ) } else { await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue)) } @@ -426,7 +459,9 @@ export class Worker { } catch (error) { debug('worker %s: failed to initialize job %s (%s)', this.#id, job.id, job.name) const retention = QueueManager.getConfigResolver().resolveJobOptions(queue) - await this.#wrapInternal(() => this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail)) + await this.#wrapInternal(() => + this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail) + ) throw error } } diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index 08fbc12..fa64e31 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -1,5 +1,5 @@ import { randomUUID } from 'node:crypto' -import type { Adapter, AcquiredJob } from '../../src/contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../../src/contracts/adapter.js' import type { JobData, JobRecord, @@ -21,6 +21,14 @@ interface DelayedJob { executeAt: number } +interface DedupEntry { + jobId: string + createdAt: number + ttl?: number + replace?: boolean + extend?: boolean +} + export function memory() { return () => new MemoryAdapter() } @@ -33,6 +41,7 @@ export class MemoryAdapter implements Adapter { #failedJobs: Map = new Map() #pendingTimeouts: Set = new Set() #schedules: Map = new Map() + #dedupIndex: Map> = new Map() setWorkerId(_workerId: string): void {} @@ -46,23 +55,33 @@ export class MemoryAdapter implements Adapter { return jobs.length } - async push(jobData: JobData): Promise { + async push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { + const deduped = this.#applyDedup(queue, jobData) + if (deduped) return deduped + if (!this.#queues.has(queue)) { this.#queues.set(queue, []) } this.#queues.get(queue)!.push(jobData) + + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } - async pushLater(jobData: JobData, delay: number): Promise { + async pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + const deduped = this.#applyDedup(queue, jobData) + if (deduped) return deduped + if (!this.#delayedJobs.has(queue)) { this.#delayedJobs.set(queue, new Map()) } @@ -73,12 +92,17 @@ export class MemoryAdapter implements Adapter { const timeout = setTimeout(() => { this.#pendingTimeouts.delete(timeout) this.#delayedJobs.get(queue)?.delete(jobData.id) - void this.pushOn(queue, jobData) + if (!this.#queues.has(queue)) { + this.#queues.set(queue, []) + } + this.#queues.get(queue)!.push(jobData) }, delay) this.#pendingTimeouts.add(timeout) - return Promise.resolve() + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } async pushMany(jobs: JobData[]): Promise { @@ -86,6 +110,10 @@ export class MemoryAdapter implements Adapter { } async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + for (const job of jobs) { await this.pushOn(queue, job) } @@ -132,6 +160,7 @@ export class MemoryAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnComplete === undefined || removeOnComplete === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -150,6 +179,7 @@ export class MemoryAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnFail === undefined || removeOnFail === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -204,6 +234,7 @@ export class MemoryAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - just remove from active this.#activeJobs.delete(jobId) + this.#cleanupDedupForJob(queue, active.job) continue } @@ -396,26 +427,41 @@ export class MemoryAdapter implements Adapter { records.push(record) if (retention && retention !== true) { - this.#applyRetention(records, retention) + this.#applyRetention(records, retention, queue) } } - #applyRetention(records: JobRecord[], retention: JobRetention) { + #applyRetention(records: JobRecord[], retention: JobRetention, queue: string) { if (retention === false || retention === true) { return } + const pruned: JobRecord[] = [] + if (retention.age !== undefined) { const maxAgeMs = parse(retention.age) if (maxAgeMs > 0) { const cutoff = Date.now() - maxAgeMs - const filtered = records.filter((record) => (record.finishedAt ?? 0) >= cutoff) - records.splice(0, records.length, ...filtered) + const kept: JobRecord[] = [] + for (const record of records) { + if ((record.finishedAt ?? 0) >= cutoff) { + kept.push(record) + } else { + pruned.push(record) + } + } + records.splice(0, records.length, ...kept) } } if (retention.count !== undefined && retention.count > 0 && records.length > retention.count) { - records.splice(0, records.length - retention.count) + const excess = records.length - retention.count + pruned.push(...records.slice(0, excess)) + records.splice(0, excess) + } + + for (const record of pruned) { + this.#cleanupDedupForJob(queue, record.data) } } @@ -425,4 +471,85 @@ export class MemoryAdapter implements Adapter { return records.find((record) => record.data.id === jobId) ?? null } + + #applyDedup(queue: string, jobData: JobData): PushResult | null { + if (!jobData.dedup) return null + + const dedupId = jobData.dedup.id + const now = Date.now() + const entry = this.#dedupIndex.get(queue)?.get(dedupId) + + if (entry) { + const withinTtl = !entry.ttl || now - entry.createdAt < entry.ttl + if (withinTtl) { + const existing = this.#findJobById(queue, entry.jobId) + if (existing) { + const replaceable = existing.location === 'pending' || existing.location === 'delayed' + if (jobData.dedup.replace && replaceable) { + existing.job.payload = structuredClone(jobData.payload) + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + } + return { outcome: 'replaced', jobId: entry.jobId } + } + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + return { outcome: 'extended', jobId: entry.jobId } + } + return { outcome: 'skipped', jobId: entry.jobId } + } + } + } + + if (!this.#dedupIndex.has(queue)) { + this.#dedupIndex.set(queue, new Map()) + } + this.#dedupIndex.get(queue)!.set(dedupId, { + jobId: jobData.id, + createdAt: now, + ttl: jobData.dedup.ttl, + replace: jobData.dedup.replace, + extend: jobData.dedup.extend, + }) + + return null + } + + #findJobById( + queue: string, + jobId: string + ): { + job: JobData + location: 'pending' | 'delayed' | 'active' | 'completed' | 'failed' + } | null { + const active = this.#activeJobs.get(jobId) + if (active && active.queue === queue) { + return { job: active.job, location: 'active' } + } + const pending = this.#queues.get(queue)?.find((j) => j.id === jobId) + if (pending) { + return { job: pending, location: 'pending' } + } + const delayed = this.#delayedJobs.get(queue)?.get(jobId) + if (delayed) { + return { job: delayed.job, location: 'delayed' } + } + const completed = this.#findHistory(this.#completedJobs, queue, jobId) + if (completed) { + return { job: completed.data, location: 'completed' } + } + const failed = this.#findHistory(this.#failedJobs, queue, jobId) + if (failed) { + return { job: failed.data, location: 'failed' } + } + return null + } + + #cleanupDedupForJob(queue: string, job: JobData): void { + if (!job.dedup) return + const entry = this.#dedupIndex.get(queue)?.get(job.dedup.id) + if (entry && entry.jobId === job.id) { + this.#dedupIndex.get(queue)?.delete(job.dedup.id) + } + } } diff --git a/tests/_utils/register_driver_test_suite.ts b/tests/_utils/register_driver_test_suite.ts index ecf5727..874956a 100644 --- a/tests/_utils/register_driver_test_suite.ts +++ b/tests/_utils/register_driver_test_suite.ts @@ -613,7 +613,9 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.isNull(job3) }) - test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ assert }) => { + test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ + assert, + }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1647,4 +1649,310 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(second!.id, 'medium') assert.equal(third!.id, 'low') }) + + test('pushOn with dedup should skip duplicate job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 1) + + const job = await adapter.popFrom('test-queue') + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('pushOn without dedup should insert normally', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'job-1', + name: 'TestJob', + payload: { data: 'first' }, + attempts: 0, + }) + + await adapter.pushOn('test-queue', { + id: 'job-2', + name: 'TestJob', + payload: { data: 'second' }, + attempts: 0, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 2) + }) + + test('pushLaterOn with dedup should skip duplicate delayed job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 60_000 + ) + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 60_000 + ) + + const job = await adapter.getJob('TestJob::delayed-1', 'test-queue') + assert.isNotNull(job) + assert.deepEqual(job!.data.payload, { attempt: 1 }) + }) + + test('pushOn with dedup should allow same id on different queues', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('queue-a', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'a' }, + attempts: 0, + dedup: { id: 'shared-id' }, + }) + + await adapter.pushOn('queue-b', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'b' }, + attempts: 0, + dedup: { id: 'shared-id' }, + }) + + const sizeA = await adapter.sizeOf('queue-a') + const sizeB = await adapter.sizeOf('queue-b') + assert.equal(sizeA, 1) + assert.equal(sizeB, 1) + }) + + test('dedup TTL: new job allowed after TTL expires', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('ttl-queue', { + id: 'uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::ttl-1', ttl: 80 }, + }) + + const second = await adapter.pushOn('ttl-queue', { + id: 'uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::ttl-1', ttl: 80 }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'skipped') + + await new Promise((r) => setTimeout(r, 150)) + + const third = await adapter.pushOn('ttl-queue', { + id: 'uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: 'TestJob::ttl-1', ttl: 80 }, + }) + assert.equal(third && typeof third === 'object' && third.outcome, 'added') + }) + + test('dedup replace: duplicate within TTL swaps payload on pending job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('rep-queue', { + id: 'rep-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + dedup: { id: 'TestJob::rep-1', ttl: 10_000, replace: true }, + }) + + const second = await adapter.pushOn('rep-queue', { + id: 'rep-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + dedup: { id: 'TestJob::rep-1', ttl: 10_000, replace: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'replaced') + + const size = await adapter.sizeOf('rep-queue') + assert.equal(size, 1) + + const job = await adapter.popFrom('rep-queue') + assert.deepEqual(job!.payload, { version: 2 }) + }) + + test('dedup extend: duplicate within TTL resets the window', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('ext-queue', { + id: 'ext-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::ext-1', ttl: 100, extend: true }, + }) + + await new Promise((r) => setTimeout(r, 60)) + + const second = await adapter.pushOn('ext-queue', { + id: 'ext-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::ext-1', ttl: 100, extend: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'extended') + + await new Promise((r) => setTimeout(r, 60)) + + // Without extend, 50ms elapsed > 40ms TTL would've expired. + const third = await adapter.pushOn('ext-queue', { + id: 'ext-uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: 'TestJob::ext-1', ttl: 100, extend: true }, + }) + assert.equal(third && typeof third === 'object' && third.outcome, 'extended') + }) + + test('dedup: cleanup removes dedup entry when job is completed without retention', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('clean-queue', { + id: 'clean-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::clean-1' }, + }) + + const popped = await adapter.popFrom('clean-queue') + await adapter.completeJob(popped!.id, 'clean-queue', true) + + // Dedup should be cleaned — new push should succeed + const second = await adapter.pushOn('clean-queue', { + id: 'clean-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::clean-1' }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + }) + + test('dedup: cleanup removes dedup entry when job fails without retention', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('clean-fail', { + id: 'fail-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::fail-1' }, + }) + + const popped = await adapter.popFrom('clean-fail') + await adapter.failJob(popped!.id, 'clean-fail', new Error('boom'), true) + + const second = await adapter.pushOn('clean-fail', { + id: 'fail-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::fail-1' }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + }) + + test('dedup: retryJob preserves dedup entry (new dispatch stays blocked)', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('retry-queue', { + id: 'retry-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::retry-1' }, + }) + + const popped = await adapter.popFrom('retry-queue') + await adapter.retryJob(popped!.id, 'retry-queue') + + // retry puts job back — dedup entry still points to same job + const second = await adapter.pushOn('retry-queue', { + id: 'retry-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::retry-1' }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'skipped') + }) + + test('dedup: pushManyOn rejects jobs with dedup', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await assert.rejects( + () => + adapter.pushManyOn('batch-queue', [ + { id: 'a', name: 'TestJob', payload: {}, attempts: 0 }, + { + id: 'b', + name: 'TestJob', + payload: {}, + attempts: 0, + dedup: { id: 'TestJob::batch-1' }, + }, + ]), + /dedup is not supported in batch dispatch/ + ) + }) } diff --git a/tests/adapter.spec.ts b/tests/adapter.spec.ts index 50e6259..7f7a293 100644 --- a/tests/adapter.spec.ts +++ b/tests/adapter.spec.ts @@ -80,7 +80,9 @@ test.group('Adapter | Redis', (group) => { ) }) - test('deleteSchedule should not leave ghost index under write-failure chaos', async ({ assert }) => { + test('deleteSchedule should not leave ghost index under write-failure chaos', async ({ + assert, + }) => { const adapter = new RedisAdapter(connection) const id = 'chaos-delete-schedule' diff --git a/tests/fake_adapter.spec.ts b/tests/fake_adapter.spec.ts index a4eae64..94924b6 100644 --- a/tests/fake_adapter.spec.ts +++ b/tests/fake_adapter.spec.ts @@ -91,6 +91,64 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) + test('should skip duplicate pushOn when dedup is set', async ({ assert }) => { + const adapter = fake()() + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + const size = await adapter.size() + assert.equal(size, 1) + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + + test('should skip duplicate pushLaterOn when dedup is set', async () => { + const adapter = fake()() + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 5000 + ) + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 5000 + ) + + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + test('should support job class matchers', async ({ assert }) => { const adapter = fake()() @@ -128,5 +186,4 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - }) diff --git a/tests/job_dispatcher.spec.ts b/tests/job_dispatcher.spec.ts index 7c71311..38243b0 100644 --- a/tests/job_dispatcher.spec.ts +++ b/tests/job_dispatcher.spec.ts @@ -317,6 +317,258 @@ test.group('JobDispatcher | groupId', () => { }) }) +test.group('JobDispatcher | dedup', () => { + test('should throw error when dedup id is empty', async ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', { data: 'test' }).dedup({ id: '' }), + 'Dedup ID must be a non-empty string' + ) + }) + + test('should store dedup id prefixed with job name', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const result = await new JobDispatcher('SendInvoiceJob', { orderId: 123 }) + .dedup({ id: 'order-123' }) + .run() + + assert.match(result.jobId, /^[0-9a-f-]{36}$/) + assert.equal(result.deduped, 'added') + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.equal(job!.id, result.jobId) + assert.equal(job!.dedup?.id, 'SendInvoiceJob::order-123') + }) + + test('should set dedup field on job data when dedup is configured', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('UniqueJob', { data: 'test' }).dedup({ id: 'my-id' }).run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.equal(job!.dedup?.id, 'UniqueJob::my-id') + }) + + test('should not set dedup field when dedup is not configured', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('RegularJob', { data: 'test' }).run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.isUndefined(job!.dedup) + }) + + test('should silently skip duplicate job with same dedup id', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('DedupJob', { attempt: 1 }).dedup({ id: 'dedup-1' }).run() + await new JobDispatcher('DedupJob', { attempt: 2 }).dedup({ id: 'dedup-1' }).run() + + const size = await sharedAdapter.size() + assert.equal(size, 1) + + const job = await sharedAdapter.pop() + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('should allow same dedup id for different job names', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('JobA', { type: 'a' }).dedup({ id: 'same-id' }).run() + await new JobDispatcher('JobB', { type: 'b' }).dedup({ id: 'same-id' }).run() + + const size = await sharedAdapter.size() + assert.equal(size, 2) + }) + + test('should work with other options like priority and queue', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const { jobId, deduped } = await new JobDispatcher('PriorityDedupJob', { task: 'important' }) + .dedup({ id: 'task-1' }) + .toQueue('high') + .priority(1) + .run() + + assert.match(jobId, /^[0-9a-f-]{36}$/) + assert.equal(deduped, 'added') + + const job = await sharedAdapter.popFrom('high') + assert.isNotNull(job) + assert.equal(job!.priority, 1) + assert.equal(job!.dedup?.id, 'PriorityDedupJob::task-1') + }) + + test('should throw when extend is set without ttl', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', extend: true }), + 'dedup.ttl is required when extend or replace is set' + ) + }) + + test('should throw when replace is set without ttl', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', replace: true }), + 'dedup.ttl is required when extend or replace is set' + ) + }) + + test('should throw when ttl is negative', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', ttl: -1 }), + 'dedup.ttl must be a non-negative duration' + ) + }) + + test('should throw when dedup id exceeds 400 chars', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'a'.repeat(401) }), + 'Dedup ID must be 400 characters or less' + ) + }) + + test('TTL: new job allowed after TTL expires', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const first = await new JobDispatcher('ThrottleJob', { n: 1 }) + .dedup({ id: 'throttle-1', ttl: 80 }) + .run() + assert.equal(first.deduped, 'added') + + const second = await new JobDispatcher('ThrottleJob', { n: 2 }) + .dedup({ id: 'throttle-1', ttl: 80 }) + .run() + assert.equal(second.deduped, 'skipped') + + await setTimeout(150) + + const third = await new JobDispatcher('ThrottleJob', { n: 3 }) + .dedup({ id: 'throttle-1', ttl: 80 }) + .run() + assert.equal(third.deduped, 'added') + assert.notEqual(third.jobId, first.jobId) + + const size = await sharedAdapter.size() + assert.equal(size, 2) + }) + + test('extend: duplicate within TTL resets the window', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const first = await new JobDispatcher('ExtendJob', { n: 1 }) + .dedup({ id: 'ext-1', ttl: 100, extend: true }) + .run() + assert.equal(first.deduped, 'added') + + await setTimeout(60) + + const second = await new JobDispatcher('ExtendJob', { n: 2 }) + .dedup({ id: 'ext-1', ttl: 100, extend: true }) + .run() + assert.equal(second.deduped, 'extended') + assert.equal(second.jobId, first.jobId) + + await setTimeout(60) + + // Without extend, original 40ms TTL would've expired (50ms elapsed). + // With extend, second push reset timer → still within window. + const third = await new JobDispatcher('ExtendJob', { n: 3 }) + .dedup({ id: 'ext-1', ttl: 100, extend: true }) + .run() + assert.equal(third.deduped, 'extended') + }) + + test('replace: duplicate within TTL swaps the pending job payload', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const first = await new JobDispatcher('ReplaceJob', { version: 1 }) + .dedup({ id: 'draft-1', ttl: 100, replace: true }) + .run() + assert.equal(first.deduped, 'added') + + const second = await new JobDispatcher('ReplaceJob', { version: 2 }) + .dedup({ id: 'draft-1', ttl: 100, replace: true }) + .run() + assert.equal(second.deduped, 'replaced') + assert.equal(second.jobId, first.jobId) + + const size = await sharedAdapter.size() + assert.equal(size, 1) + + const job = await sharedAdapter.pop() + assert.deepEqual(job!.payload, { version: 2 }) + }) + + test('replace: active job is not replaced (returns skipped)', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('ActiveReplaceJob', { version: 1 }) + .dedup({ id: 'ar-1', ttl: 1000, replace: true }) + .run() + + await sharedAdapter.pop() // moves to active + + const second = await new JobDispatcher('ActiveReplaceJob', { version: 2 }) + .dedup({ id: 'ar-1', ttl: 1000, replace: true }) + .run() + + assert.equal(second.deduped, 'skipped') + }) +}) + test.group('JobBatchDispatcher', () => { test('should dispatch multiple jobs correctly', async ({ assert }) => { const sharedAdapter = memory()() diff --git a/tests/otel.spec.ts b/tests/otel.spec.ts index 4fe4b93..70a7a71 100644 --- a/tests/otel.spec.ts +++ b/tests/otel.spec.ts @@ -22,13 +22,17 @@ function makeJob(overrides: Partial = {}): AcquiredJob { * Creates an instrumentation with a fake QueueManager, * captures the injected wrappers from the patched init. */ -async function setupWithWrappers(config: ConstructorParameters[0] = {}) { +async function setupWithWrappers( + config: ConstructorParameters[0] = {} +) { const instrumentation = new QueueInstrumentation(config) instrumentation.enable() let capturedConfig: any const fakeManager = { - init: async (cfg: any) => { capturedConfig = cfg }, + init: async (cfg: any) => { + capturedConfig = cfg + }, } instrumentation.manuallyRegister({ QueueManager: fakeManager }) @@ -37,13 +41,21 @@ async function setupWithWrappers(config: ConstructorParameters(fn: () => Promise, job: AcquiredJob, queue: string) => Promise, - internalOperationWrapper: capturedConfig.internalOperationWrapper as (fn: () => Promise) => Promise, + executionWrapper: capturedConfig.executionWrapper as ( + fn: () => Promise, + job: AcquiredJob, + queue: string + ) => Promise, + internalOperationWrapper: capturedConfig.internalOperationWrapper as ( + fn: () => Promise + ) => Promise, } } test.group('QueueInstrumentation | lifecycle', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('enable() is idempotent', ({ assert }) => { @@ -80,7 +92,9 @@ test.group('QueueInstrumentation | lifecycle', (group) => { }) test.group('QueueInstrumentation | dispatch via DC', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('creates PRODUCER span when no active span', async ({ assert }) => { @@ -120,7 +134,10 @@ test.group('QueueInstrumentation | dispatch via DC', (group) => { assert.isDefined(parentSpan) assert.isDefined(producerSpan) assert.equal(producerSpan!.parentSpanContext?.spanId, parentSpan!.spanContext().spanId) - assert.equal(jobData.traceContext?.traceparent?.split('-')[2], producerSpan!.spanContext().spanId) + assert.equal( + jobData.traceContext?.traceparent?.split('-')[2], + producerSpan!.spanContext().spanId + ) instrumentation.disable() }) @@ -188,7 +205,9 @@ test.group('QueueInstrumentation | dispatch via DC', (group) => { }) test.group('QueueInstrumentation | execute via executionWrapper', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('creates CONSUMER span with semconv attributes', async ({ assert }) => { @@ -321,10 +340,21 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => { test('queue_time_ms end-to-end via dispatch then execute', async ({ assert }) => { const { instrumentation, executionWrapper } = await setupWithWrappers() - const jobData: JobData = { id: 'e2e-qt-1', name: 'E2EQueueTimeJob', payload: {}, attempts: 0, createdAt: Date.now() } + const jobData: JobData = { + id: 'e2e-qt-1', + name: 'E2EQueueTimeJob', + payload: {}, + attempts: 0, + createdAt: Date.now(), + } await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' }) - const job = makeJob({ id: 'e2e-qt-1', name: 'E2EQueueTimeJob', createdAt: jobData.createdAt, traceContext: jobData.traceContext }) + const job = makeJob({ + id: 'e2e-qt-1', + name: 'E2EQueueTimeJob', + createdAt: jobData.createdAt, + traceContext: jobData.traceContext, + }) const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } await executeChannel.tracePromise(async () => { @@ -347,11 +377,15 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => { const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } await executeChannel.tracePromise(async () => { - await executionWrapper(async () => { - const tracer = trace.getTracer('test-child') - const childSpan = tracer.startSpan('child-operation') - childSpan.end() - }, job, 'default') + await executionWrapper( + async () => { + const tracer = trace.getTracer('test-child') + const childSpan = tracer.startSpan('child-operation') + childSpan.end() + }, + job, + 'default' + ) }, message) const spans = getFinishedSpans() @@ -367,7 +401,9 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => { }) test.group('QueueInstrumentation | trace linking', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('link mode links consumer to dispatch trace', async ({ assert }) => { @@ -394,7 +430,9 @@ test.group('QueueInstrumentation | trace linking', (group) => { }) test('parent mode makes consumer child of dispatch', async ({ assert }) => { - const { instrumentation, executionWrapper } = await setupWithWrappers({ executionSpanLinkMode: 'parent' }) + const { instrumentation, executionWrapper } = await setupWithWrappers({ + executionSpanLinkMode: 'parent', + }) const jobData: JobData = { id: 'parent-1', name: 'ParentedJob', payload: {}, attempts: 0 } await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' }) @@ -417,7 +455,9 @@ test.group('QueueInstrumentation | trace linking', (group) => { }) test.group('QueueInstrumentation | manuallyRegister', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('patches init to inject wrappers', async ({ assert }) => { @@ -448,7 +488,9 @@ test.group('QueueInstrumentation | manuallyRegister', (group) => { }) test.group('QueueInstrumentation | custom config', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('custom messagingSystem attribute', async ({ assert }) => { diff --git a/tests/sync_adapter.spec.ts b/tests/sync_adapter.spec.ts index 5bf0e00..1d05d50 100644 --- a/tests/sync_adapter.spec.ts +++ b/tests/sync_adapter.spec.ts @@ -58,9 +58,7 @@ test.group('SyncAdapter', (group) => { assert.deepEqual(contextJobIds, Array(contextJobIds.length).fill(jobId)) }) - test('should log delayed sync job failures without unhandled rejections', async ({ - assert, - }) => { + test('should log delayed sync job failures without unhandled rejections', async ({ assert }) => { const logger = new MemoryLogger() let unhandledError: unknown const onUnhandledRejection = (error: unknown) => { diff --git a/tests/worker.spec.ts b/tests/worker.spec.ts index 226531b..fbe5b65 100644 --- a/tests/worker.spec.ts +++ b/tests/worker.spec.ts @@ -549,7 +549,9 @@ test.group('Worker', () => { const controller = new AbortController() const originalTimeout = AbortSignal.timeout const originalAddEventListener = controller.signal.addEventListener.bind(controller.signal) - const originalRemoveEventListener = controller.signal.removeEventListener.bind(controller.signal) + const originalRemoveEventListener = controller.signal.removeEventListener.bind( + controller.signal + ) let addedAbortListeners = 0 let removedAbortListeners = 0