From ac611c557a2ef4188aa9186d488f451f44da6303 Mon Sep 17 00:00:00 2001 From: Offending Commit Date: Thu, 7 May 2026 16:29:28 -0500 Subject: [PATCH] fix(deriver): break worker-pool deadlock on hung LLM calls Two compounding bugs caused the deriver worker pool to wedge after a single CF-Gateway-streamed Gemini response failed to terminate: 1. process_work_unit holds `async with self.semaphore` across the inner LLM call (process_representation_batch / process_item). With no asyncio-level timeout, a hung HTTP read held the slot forever. Eight workers x one hung call each = pool fully locked. 2. polling_loop gated cleanup_stale_work_units behind `if self.semaphore.locked(): continue`, so once the pool was full the stale-AQS cleanup never ran. STALE_SESSION_TIMEOUT_MINUTES became dead-lettered. Pod restarts didn't help: new pods reclaimed the same poisoned work_unit_keys and re-wedged within minutes. Fixes: - Add DERIVER_WORK_UNIT_TIMEOUT_SECONDS (default 600s) and wrap both process_representation_batch and process_item in asyncio.wait_for. TimeoutError propagates to _handle_processing_error, the `async with` unwinds, the semaphore slot releases. - Move cleanup_stale_work_units above the semaphore-locked check so AQS rows always get reaped on every poll tick, even with a full pool. Cleanup is cheap; running it unconditionally costs one index scan per poll. Symptoms before fix: active_queue_sessions rows aging past STALE_SESSION_TIMEOUT_MINUTES, queue.processed=false count climbing into thousands across all task types, deriver pod alive (PID 1 ok) but log output silent for hours. --- src/config.py | 11 +++++++++ src/deriver/queue_manager.py | 46 +++++++++++++++++++++++++++++------- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/src/config.py b/src/config.py index 2d19911b5..2df302a38 100644 --- a/src/config.py +++ b/src/config.py @@ -713,6 +713,17 @@ class DeriverSettings(HonchoSettings): ] = 1.0 STALE_SESSION_TIMEOUT_MINUTES: Annotated[int, Field(default=5, gt=0, le=1440)] = 5 + # Hard upper bound on a single work-unit pass through process_work_unit. + # If the inner LLM call hangs (e.g. CF Gateway streaming a Gemini response + # that never terminates), asyncio.wait_for raises TimeoutError, the + # `async with self.semaphore` block exits, and the slot is released so + # other workers can claim other work units. Without this bound the worker + # pool deadlocks: all N slots held by hung tasks, semaphore permanently + # locked, polling loop never reaches cleanup_stale_work_units. + WORK_UNIT_TIMEOUT_SECONDS: Annotated[ + int, Field(default=600, gt=0, le=7200) + ] = 600 + # Retention window (seconds) for keeping errored items in the queue QUEUE_ERROR_RETENTION_SECONDS: Annotated[ int, Field(default=30 * 24 * 3600, gt=0) diff --git a/src/deriver/queue_manager.py b/src/deriver/queue_manager.py index ee13743be..5d0851632 100644 --- a/src/deriver/queue_manager.py +++ b/src/deriver/queue_manager.py @@ -384,14 +384,26 @@ async def polling_loop(self) -> None: self.queue_empty_flag.clear() continue - # Check if we have capacity before querying + # Always reap stale AQS rows, even when the pool is full. + # Previously this was gated behind `if self.semaphore.locked()` + # below — which deadlocks the system when all workers are + # wedged on a hung LLM call: cleanup never runs, AQS rows go + # unbounded, no new claims possible. Cleanup is cheap; do it + # unconditionally on every poll tick. + try: + await self.cleanup_stale_work_units() + except Exception as cleanup_exc: + logger.exception( + "Stale work unit cleanup failed: %s", cleanup_exc + ) + + # Check if we have capacity before querying for new work if self.semaphore.locked(): # logger.debug("All workers busy, waiting") await asyncio.sleep(settings.DERIVER.POLLING_SLEEP_INTERVAL_SECONDS) continue try: - await self.cleanup_stale_work_units() await self.refresh_queue_health_metrics() claimed_work_units = await self.get_and_claim_work_units() if claimed_work_units: @@ -843,12 +855,21 @@ async def process_work_unit(self, work_unit_key: str, worker_id: str) -> None: for item in items_to_process if item.message_id is not None ] - await process_representation_batch( - messages_context, - message_level_configuration, - observers=observers, - observed=work_unit.observed, - queue_item_message_ids=queue_item_message_ids, + # Bounded by DERIVER.WORK_UNIT_TIMEOUT_SECONDS so a hung + # LLM call (CF Gateway streaming a Gemini response that + # never terminates) raises TimeoutError instead of holding + # the semaphore forever. Without this the worker pool + # deadlocks: every slot held by a hung call, no new claims + # possible. See cleanup_stale_work_units gating fix above. + await asyncio.wait_for( + process_representation_batch( + messages_context, + message_level_configuration, + observers=observers, + observed=work_unit.observed, + queue_item_message_ids=queue_item_message_ids, + ), + timeout=settings.DERIVER.WORK_UNIT_TIMEOUT_SECONDS, ) await self.mark_queue_items_as_processed( items_to_process, work_unit_key @@ -873,7 +894,14 @@ async def process_work_unit(self, work_unit_key: str, worker_id: str) -> None: break try: - await process_item(queue_item) + # Same WORK_UNIT_TIMEOUT_SECONDS bound as the + # representation path — covers summary/dream/webhook + # task types so a hung specialist call cannot hold + # the semaphore indefinitely. + await asyncio.wait_for( + process_item(queue_item), + timeout=settings.DERIVER.WORK_UNIT_TIMEOUT_SECONDS, + ) await self.mark_queue_items_as_processed( [queue_item], work_unit_key )