diff --git a/src/config.py b/src/config.py index 2d19911b5..2df302a38 100644 --- a/src/config.py +++ b/src/config.py @@ -713,6 +713,17 @@ class DeriverSettings(HonchoSettings): ] = 1.0 STALE_SESSION_TIMEOUT_MINUTES: Annotated[int, Field(default=5, gt=0, le=1440)] = 5 + # Hard upper bound on a single work-unit pass through process_work_unit. + # If the inner LLM call hangs (e.g. CF Gateway streaming a Gemini response + # that never terminates), asyncio.wait_for raises TimeoutError, the + # `async with self.semaphore` block exits, and the slot is released so + # other workers can claim other work units. Without this bound the worker + # pool deadlocks: all N slots held by hung tasks, semaphore permanently + # locked, polling loop never reaches cleanup_stale_work_units. + WORK_UNIT_TIMEOUT_SECONDS: Annotated[ + int, Field(default=600, gt=0, le=7200) + ] = 600 + # Retention window (seconds) for keeping errored items in the queue QUEUE_ERROR_RETENTION_SECONDS: Annotated[ int, Field(default=30 * 24 * 3600, gt=0) diff --git a/src/deriver/queue_manager.py b/src/deriver/queue_manager.py index ee13743be..5d0851632 100644 --- a/src/deriver/queue_manager.py +++ b/src/deriver/queue_manager.py @@ -384,14 +384,26 @@ async def polling_loop(self) -> None: self.queue_empty_flag.clear() continue - # Check if we have capacity before querying + # Always reap stale AQS rows, even when the pool is full. + # Previously this was gated behind `if self.semaphore.locked()` + # below — which deadlocks the system when all workers are + # wedged on a hung LLM call: cleanup never runs, AQS rows go + # unbounded, no new claims possible. Cleanup is cheap; do it + # unconditionally on every poll tick. + try: + await self.cleanup_stale_work_units() + except Exception as cleanup_exc: + logger.exception( + "Stale work unit cleanup failed: %s", cleanup_exc + ) + + # Check if we have capacity before querying for new work if self.semaphore.locked(): # logger.debug("All workers busy, waiting") await asyncio.sleep(settings.DERIVER.POLLING_SLEEP_INTERVAL_SECONDS) continue try: - await self.cleanup_stale_work_units() await self.refresh_queue_health_metrics() claimed_work_units = await self.get_and_claim_work_units() if claimed_work_units: @@ -843,12 +855,21 @@ async def process_work_unit(self, work_unit_key: str, worker_id: str) -> None: for item in items_to_process if item.message_id is not None ] - await process_representation_batch( - messages_context, - message_level_configuration, - observers=observers, - observed=work_unit.observed, - queue_item_message_ids=queue_item_message_ids, + # Bounded by DERIVER.WORK_UNIT_TIMEOUT_SECONDS so a hung + # LLM call (CF Gateway streaming a Gemini response that + # never terminates) raises TimeoutError instead of holding + # the semaphore forever. Without this the worker pool + # deadlocks: every slot held by a hung call, no new claims + # possible. See cleanup_stale_work_units gating fix above. + await asyncio.wait_for( + process_representation_batch( + messages_context, + message_level_configuration, + observers=observers, + observed=work_unit.observed, + queue_item_message_ids=queue_item_message_ids, + ), + timeout=settings.DERIVER.WORK_UNIT_TIMEOUT_SECONDS, ) await self.mark_queue_items_as_processed( items_to_process, work_unit_key @@ -873,7 +894,14 @@ async def process_work_unit(self, work_unit_key: str, worker_id: str) -> None: break try: - await process_item(queue_item) + # Same WORK_UNIT_TIMEOUT_SECONDS bound as the + # representation path — covers summary/dream/webhook + # task types so a hung specialist call cannot hold + # the semaphore indefinitely. + await asyncio.wait_for( + process_item(queue_item), + timeout=settings.DERIVER.WORK_UNIT_TIMEOUT_SECONDS, + ) await self.mark_queue_items_as_processed( [queue_item], work_unit_key )