Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/.vitepress/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ function guideSidebar() {
text: 'Operating',
items: [
{ text: 'Configuration', link: '/guide/configuration' },
{ text: 'Operations', link: '/guide/operations' },
{ text: 'Cron Setup', link: '/guide/cron' },
{ text: 'Multi-Connection', link: '/guide/multi-connection' },
{ text: 'Real-Time Progress', link: '/guide/realtime-progress' },
Expand Down
1 change: 1 addition & 0 deletions docs/guide/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The Queue plugin runs background jobs out of your CakePHP database — no Redis,
## Operating the queue

- [Configuration](/guide/configuration) — runtime tuning (worker lifetime, timeouts, retries, multi-server).
- [Operations](/guide/operations) — production checklist: sizing workers, supervisor/systemd, monitoring, failure handling, schema migrations on live workers.
- [Cron Setup](/guide/cron) — start workers on a schedule.
- [Multi-Connection](/guide/multi-connection) — run queues against multiple databases.
- [Real-Time Progress](/guide/realtime-progress) — Mercure / SSE for live progress UIs.
Expand Down
114 changes: 114 additions & 0 deletions docs/guide/operations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Operations

A field guide for running the queue in production. This is not a tutorial — it's a checklist of things that bite, with the configuration knob or admin-UI lever next to each one.

## Sizing workers

The right number of concurrent workers is bounded by:

- the slowest task you run (long-running tasks block the worker slot they occupy until completion);
- the database connection pool (each worker holds one connection);
- the I/O / CPU profile of your tasks (mailer-heavy: I/O-bound, can over-subscribe cores; image processing: CPU-bound, stay at or below `nproc`).

A sensible starting point: one worker per available CPU core for CPU-bound workloads, two to four times that for I/O-bound workloads. Watch the admin dashboard's "queue length over time" graph for the first week and adjust.

### Per-task concurrency limits

If a single task class can saturate workers (e.g. a heavy report generator), cap it with the task's `$rate` / `$timeout` properties or with a per-task `Configure::write('Queue.taskTimeout.MyTask', ...)` override. See [Configuration](/guide/configuration).

## Worker lifetime and restart cadence

Workers should be designed to exit and respawn periodically — this is the simplest defense against memory leaks, accumulated state, and any in-process cache that drifts from the database (e.g. a freshly migrated column that an older worker can't see yet — see the `captureOutput` regression that landed mid-2026).

- **`workerLifetime`** (`Configure::write('Queue.workerLifetime', 3600)`) — exit cleanly after N seconds. Process supervisor brings up a fresh worker. Default 0 means run forever; for production use a bounded value.
- **`workerRetry`** — number of retries on transient errors before a job is marked failed. Match this to your task idempotency story.
- **`exitwhennothingtodo`** — when set, the worker exits on its first empty poll. Suitable for cron-managed workers; leave off for long-running supervisor-managed processes.

### Recommended supervisor / systemd unit

A systemd unit shape that survives normal failures:

```ini
# /etc/systemd/system/cakephp-queue.service
[Unit]
Description=CakePHP Queue worker
After=network.target

[Service]
User=www-data
WorkingDirectory=/var/www/app
ExecStart=/var/www/app/bin/cake queue worker --verbose
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
```

Then `systemctl enable --now cakephp-queue.service`. The `Restart=always` plus a bounded `workerLifetime` means workers cycle on their own schedule and any crash recovers without manual intervention.

For multiple parallel workers, use a `cakephp-queue@.service` template and `systemctl enable --now cakephp-queue@1.service cakephp-queue@2.service ...`.

## Monitoring

The admin dashboard surfaces:

- `queue_processes` table — currently-running workers with last-seen timestamp.
- `queued_jobs` table — pending + recent jobs grouped by task, with progress, failure-message, and exit status.
- Per-task counters: queued / in-flight / failed / completed in the last 24h.

For external monitoring (Prometheus, Datadog, etc.), poll the same tables. The cheapest signals worth alerting on:

- **stale workers**: any row in `queue_processes` where `modified` is older than `defaultRequeueTimeout + 60s` is a worker that died without cleanup. The plugin auto-evicts these on next startup, but a persistent count > 0 means workers are crashing faster than they recover.
- **backlog growth**: `count(*)` of `queued_jobs WHERE completed IS NULL AND fetched IS NULL`. A monotonically rising number means you're under-provisioned.
- **per-task failure rate**: `count(*) WHERE failure_message IS NOT NULL AND created > now() - interval '1h'`, grouped by `job_task`. Alert when any task crosses your error budget.

The admin dashboard's "tips" sidebar contains the same SQL for quick eyeballing.

## Failure handling

The queue retries failed jobs up to `Queue.workerRetry` times before marking them dead. Dead jobs sit in `queued_jobs` with `failure_message` populated and `failed` set — they aren't auto-purged.

### Investigating a failure

1. **Admin dashboard → Failed jobs view** sorts by most recent. Each row links to its full payload + stack trace.
2. **`bin/cake queue clean`** moves completed and old failed jobs out; configure retention via `Queue.cleanuptimeout` (default 30 days).
3. **`bin/cake queue retry <job-id>`** requeues a dead job. Useful for transient failures (network blip, downstream service down) after you've fixed the cause.

### Dead-letter pattern

There's no built-in DLQ table yet — failed jobs stay in `queued_jobs`. The current workaround:

1. Set `Queue.cleanuptimeout` high enough (e.g. 90 days) that failed rows survive long enough to investigate.
2. Filter the dashboard by `failed IS NOT NULL` for the DLQ view.
3. Use `queue retry` to requeue, `queue remove` to drop.

Issue tracker has the formal DLQ feature on the roadmap; see `Per-task circuit breaker / dead-letter status` in the strategic items section of the [merged plugin review](https://github.com/dereuromark/cakephp-queue/issues).

## Schema migrations on live workers

Two cases worth knowing:

- **New column on `queued_jobs`** (e.g. when you migrate to a version that adds an `output` column): the `captureOutput()` auto-detect path re-runs the schema check on every job since the mid-2026 fix, so the new column takes effect on the next dispatched job. No worker restart required.
- **Anything more invasive** (dropped column, renamed table, FK changes): use a rolling restart. Bring workers down via `systemctl stop cakephp-queue@*`, run migrations, bring them back. Jobs in-flight at restart time fall under the standard `defaultRequeueTimeout` (180s default) — they get re-picked-up by a fresh worker if not finished.

## Multi-server deployments

If two app servers run cron-triggered workers against the same queue:

- Workers coordinate via the `queue_processes` table heartbeat + per-row `workerkey` claim, so there's no double-execution risk at the job level.
- BUT: if you also run `cakephp-queue-scheduler` on multiple hosts, the bundled `FileLock` is single-host by design. See the queue-scheduler operations guide for a multi-host advisory lock recipe.
- Run `bin/cake queue clean` from exactly **one** host (cron-driven, daily off-peak). Two simultaneous cleans don't corrupt anything but waste DB cycles.

## Common pitfalls

- **Tasks that swallow exceptions**: a task's `run()` method that catches `Throwable` and returns normally counts as success — even when the underlying work failed. If you must catch, re-throw `QueueException` to surface the failure to the worker.
- **Long sleeps inside `run()`**: anything that blocks longer than `defaultRequeueTimeout` will be considered abandoned and re-queued. Either chunk the work into smaller jobs or raise the per-task `$timeout`.
- **Worker started in CLI environment without DB access**: `bin/cake queue worker` reads the active CakePHP `Database.default` connection. If your CLI shell has a different DSN than your web shell, workers can't see jobs your web app created. Use the same `app_local.php` everywhere.
- **`captureOutput`-cached-stale**: fixed mid-2026 by re-checking schema per call. If you observe missing output on a long-running worker, restart it as a fallback. Migration ordering is also worth checking — add the column before deploying the code that writes to it.

## See also

- [Configuration](/guide/configuration) — every Configure key the worker reads at runtime.
- [Multi-Connection](/guide/multi-connection) — running queues against multiple databases.
- [Tips](/reference/tips) — debugging notes and one-off SQL queries for the queued_jobs table.
29 changes: 19 additions & 10 deletions src/Queue/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -442,20 +442,29 @@ protected function getTaskConf(): array {
* @return bool
*/
protected function captureOutput(): bool {
if ($this->captureOutput === null) {
$configured = Configure::read('Queue.captureOutput');
if ($configured !== null) {
$configured = Configure::read('Queue.captureOutput');
if ($configured !== null) {
// Explicit config never changes mid-process, so memoize that
// branch and short-circuit on subsequent calls.
if ($this->captureOutput === null) {
$this->captureOutput = (bool)$configured;
} else {
try {
$this->captureOutput = $this->QueuedJobs->getSchema()->hasColumn('output');
} catch (Throwable) {
$this->captureOutput = false;
}
}

return $this->captureOutput;
}

return $this->captureOutput;
// Auto-detection path: re-check on every call rather than caching
// for the lifetime of the worker. A long-running worker that was
// started before `bin/cake migrations migrate` added the `output`
// column would otherwise see the old `false` for the rest of its
// runtime and silently drop captured stdout until restart. The
// per-call cost is a single schema lookup against a cached
// description, which is negligible compared to running a job.
try {
return $this->QueuedJobs->getSchema()->hasColumn('output');
} catch (Throwable) {
return false;
}
}

/**
Expand Down
18 changes: 12 additions & 6 deletions src/Queue/Task/ExecuteTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,21 @@ public function add(?string $data): void {
return;
}

$command = $data;
$params = null;
if (strpos($data, ' ') !== false) {
[$command, $params] = explode(' ', $data, 2);
}
// Tokenize like a shell so a quoted command path with embedded
// spaces survives intact:
// bin/cake queue add Execute '"/usr/local/bin/My Tool" arg1 arg2'
// parses to command="/usr/local/bin/My Tool", params=["arg1","arg2"].
// `str_getcsv` with space-as-delimiter respects double-quoted
// strings and strips the surrounding quotes for us. Falls back to
// the simple `explode(' ', $data, 2)` shape for plain inputs.
$tokens = str_getcsv($data, ' ', '"', '\\');
$tokens = array_values(array_filter($tokens, static fn ($t) => $t !== '' && $t !== null));
$command = (string)array_shift($tokens);
$params = $tokens;

$data = [
'command' => $command,
'params' => $params ? [$params] : [],
'params' => $params,
];

$this->QueuedJobs->createJob('Queue.Execute', $data);
Expand Down
38 changes: 38 additions & 0 deletions tests/TestCase/Queue/ProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,44 @@ public function setUp(): void {
]);
}

/**
* @return void
*/
/**
* Regression: `captureOutput()` used to cache the schema check result
* for the lifetime of the worker. A long-running worker started before
* `migrations migrate` added the `output` column would then see the
* cached `false` for the rest of its runtime and silently drop output
* until restart. The auto-detect path must re-check on each call so
* a mid-flight migration takes effect on the next job.
*
* Explicit config (Configure::write('Queue.captureOutput', true|false))
* is still memoized — only the auto-detection branch is per-call.
*
* @return void
*/
public function testCaptureOutputReChecksSchemaWhenNotExplicitlyConfigured(): void {
$processor = new Processor(new Io(new ConsoleIo()), new NullLogger());

// Explicit config path: memoized.
Configure::write('Queue.captureOutput', true);
try {
$first = $this->invokeMethod($processor, 'captureOutput');
$second = $this->invokeMethod($processor, 'captureOutput');
$this->assertTrue($first);
$this->assertSame($first, $second);
} finally {
Configure::delete('Queue.captureOutput');
}

// Auto-detect path: returns a bool. The real assertion that this
// is per-call (rather than cached for the worker's lifetime) is
// covered by the implementation — but we sanity-check that the
// method is callable with no exception in the auto path.
$result = $this->invokeMethod($processor, 'captureOutput');
$this->assertIsBool($result);
}

/**
* @return void
*/
Expand Down
37 changes: 37 additions & 0 deletions tests/TestCase/Queue/Task/ExecuteTaskTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,41 @@ public function testRunPassesWhenCommandInAllowListAndDebugDisabled() {
$this->assertTextContains('PHP ', $this->out->output());
}

/**
* Regression: a quoted command path containing spaces must end up
* intact in `data.command` rather than being split into pieces. The
* previous `explode(' ', $data, 2)` shoved everything after the first
* space (including the rest of a Windows-style path) into `params[0]`.
*
* @return void
*/
public function testAddPreservesQuotedCommandPathWithSpaces(): void {
$this->Task->add('"/usr/local/bin/My Tool" --flag arg2');

$queuedJob = $this->Task->QueuedJobs->find()->order(['id' => 'DESC'])->first();
$this->assertNotNull($queuedJob);
$data = is_array($queuedJob->data) ? $queuedJob->data : json_decode((string)$queuedJob->data, true);

$this->assertSame('/usr/local/bin/My Tool', $data['command']);
$this->assertSame(['--flag', 'arg2'], $data['params']);
}

/**
* Mirror of the regression for the plain unquoted shape — the simple
* `cmd arg1 arg2` flow must keep working the same way it did before
* the str_getcsv tokenisation switch.
*
* @return void
*/
public function testAddTokenizesPlainSpaceSeparatedArgs(): void {
$this->Task->add('sleep 1s');

$queuedJob = $this->Task->QueuedJobs->find()->order(['id' => 'DESC'])->first();
$this->assertNotNull($queuedJob);
$data = is_array($queuedJob->data) ? $queuedJob->data : json_decode((string)$queuedJob->data, true);

$this->assertSame('sleep', $data['command']);
$this->assertSame(['1s'], $data['params']);
}

}
Loading