Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,17 @@ class DeriverSettings(HonchoSettings):
] = 1.0
STALE_SESSION_TIMEOUT_MINUTES: Annotated[int, Field(default=5, gt=0, le=1440)] = 5

# Hard upper bound on a single work-unit pass through process_work_unit.
# If the inner LLM call hangs (e.g. CF Gateway streaming a Gemini response
# that never terminates), asyncio.wait_for raises TimeoutError, the
# `async with self.semaphore` block exits, and the slot is released so
# other workers can claim other work units. Without this bound the worker
# pool deadlocks: all N slots held by hung tasks, semaphore permanently
# locked, polling loop never reaches cleanup_stale_work_units.
WORK_UNIT_TIMEOUT_SECONDS: Annotated[
int, Field(default=600, gt=0, le=7200)
] = 600

# Retention window (seconds) for keeping errored items in the queue
QUEUE_ERROR_RETENTION_SECONDS: Annotated[
int, Field(default=30 * 24 * 3600, gt=0)
Expand Down
46 changes: 37 additions & 9 deletions src/deriver/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,14 +384,26 @@
self.queue_empty_flag.clear()
continue

# Check if we have capacity before querying
# Always reap stale AQS rows, even when the pool is full.
# Previously this was gated behind `if self.semaphore.locked()`
# below — which deadlocks the system when all workers are
# wedged on a hung LLM call: cleanup never runs, AQS rows go
# unbounded, no new claims possible. Cleanup is cheap; do it
# unconditionally on every poll tick.
try:
await self.cleanup_stale_work_units()
except Exception as cleanup_exc:
logger.exception(
"Stale work unit cleanup failed: %s", cleanup_exc
)

# Check if we have capacity before querying for new work
if self.semaphore.locked():
# logger.debug("All workers busy, waiting")
await asyncio.sleep(settings.DERIVER.POLLING_SLEEP_INTERVAL_SECONDS)
continue

try:
await self.cleanup_stale_work_units()
await self.refresh_queue_health_metrics()
claimed_work_units = await self.get_and_claim_work_units()
if claimed_work_units:
Expand Down Expand Up @@ -843,12 +855,21 @@
for item in items_to_process
if item.message_id is not None
]
await process_representation_batch(
messages_context,
message_level_configuration,
observers=observers,
observed=work_unit.observed,
queue_item_message_ids=queue_item_message_ids,
# Bounded by DERIVER.WORK_UNIT_TIMEOUT_SECONDS so a hung
# LLM call (CF Gateway streaming a Gemini response that
# never terminates) raises TimeoutError instead of holding
# the semaphore forever. Without this the worker pool
# deadlocks: every slot held by a hung call, no new claims
# possible. See cleanup_stale_work_units gating fix above.
await asyncio.wait_for(
process_representation_batch(
messages_context,
message_level_configuration,
observers=observers,
observed=work_unit.observed,
queue_item_message_ids=queue_item_message_ids,
),
timeout=settings.DERIVER.WORK_UNIT_TIMEOUT_SECONDS,
)
await self.mark_queue_items_as_processed(
items_to_process, work_unit_key
Expand All @@ -873,7 +894,14 @@
break

try:
await process_item(queue_item)
# Same WORK_UNIT_TIMEOUT_SECONDS bound as the
# representation path — covers summary/dream/webhook
# task types so a hung specialist call cannot hold
# the semaphore indefinitely.
await asyncio.wait_for(
process_item(queue_item),
timeout=settings.DERIVER.WORK_UNIT_TIMEOUT_SECONDS,
)
await self.mark_queue_items_as_processed(
[queue_item], work_unit_key
)
Expand Down Expand Up @@ -1161,7 +1189,7 @@
)
now_utc = datetime.now(timezone.utc)
for item in items:
if item.created_at is not None:

Check warning on line 1192 in src/deriver/queue_manager.py

View workflow job for this annotation

GitHub Actions / basedpyright

Condition will always evaluate to True since the types "datetime" and "None" have no overlap (reportUnnecessaryComparison)
prometheus_metrics.observe_deriver_queue_item_latency(
workspace_name=work_unit.workspace_name,
task_type=work_unit.task_type,
Expand Down Expand Up @@ -1201,7 +1229,7 @@
workspace_name=work_unit.workspace_name,
task_type=work_unit.task_type,
)
if item.created_at is not None:

Check warning on line 1232 in src/deriver/queue_manager.py

View workflow job for this annotation

GitHub Actions / basedpyright

Condition will always evaluate to True since the types "datetime" and "None" have no overlap (reportUnnecessaryComparison)
prometheus_metrics.observe_deriver_queue_item_latency(
workspace_name=work_unit.workspace_name,
task_type=work_unit.task_type,
Expand Down
Loading