Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions app/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class BaseConsumer(abc.ABC):
group_id: str
event_schema: type[BaseEvent[Any]]

# Subclasses can set this to skip messages older than N ms.
# None = no staleness filter (default). Useful for real-time
# audio workers to discard backlogged chunks from dead sessions.
max_message_age_ms: int | None = None

# Declared here so Mypy can track it on the class body
_initialized: bool = False

Expand Down Expand Up @@ -108,6 +113,25 @@ async def _consume_loop(self) -> None:
if not self._running:
break

# Staleness guard: skip (and commit past) messages that are
# older than `max_message_age_ms`. This prevents workers from
# processing large backlogs of audio from dead sessions whose
# room IDs no longer exist in Redis.
if self.max_message_age_ms is not None:
import time as _time

age_ms = _time.time() * 1000 - msg.timestamp
if age_ms > self.max_message_age_ms:
topic_safe = sanitize_log_args(self.topic)[0]
logger.debug(
"Skipping stale message on '%s' (age=%.0fms > limit=%dms)",
topic_safe,
age_ms,
self.max_message_age_ms,
)
await self._consumer.commit()
continue

try:
event = self.event_schema.model_validate(msg.value)
await self._process_with_retry(event)
Expand Down
46 changes: 44 additions & 2 deletions app/kafka/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,19 @@ def register_consumer(self, consumer: BaseConsumer) -> None:
logger.info("Registered consumer for topic: '%s'", topic_safe)

async def _init_topics(self) -> None:
"""Create required topics if they don't exist."""
"""Create required topics if they don't exist, then enforce retention."""
from aiokafka.admin import ( # type: ignore[import-untyped]
AIOKafkaAdminClient,
NewTopic,
)

from app.kafka.topics import TOPICS_TO_CREATE
from app.kafka.topics import (
AUDIO_RAW,
AUDIO_SYNTHESIZED,
TEXT_ORIGINAL,
TEXT_TRANSLATED,
TOPICS_TO_CREATE,
)

admin_client = AIOKafkaAdminClient(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
Expand Down Expand Up @@ -108,6 +114,42 @@ async def _init_topics(self) -> None:
topic_names = [t.name for t in topics_to_create_metadata]
logger.info("Creating missing Kafka topics: %s", topic_names)
await admin_client.create_topics(topics_to_create_metadata)

# --- Enforce short retention on real-time pipeline topics ---
# 5 minutes is more than enough for any active session; anything
# older is from a dead session and should be discarded automatically.
# This works whether the topic was just created or already existed.
PIPELINE_TOPICS = [
AUDIO_RAW,
AUDIO_SYNTHESIZED,
TEXT_ORIGINAL,
TEXT_TRANSLATED,
]
RETENTION_MS = "300000" # 5 minutes

try:
from aiokafka.admin import ConfigResource

config_resources = [
ConfigResource(
resource_type="topic",
name=topic,
configs={"retention.ms": RETENTION_MS},
)
for topic in PIPELINE_TOPICS
]
await admin_client.alter_configs(config_resources)
logger.info(
"Set retention.ms=%s on pipeline topics: %s",
RETENTION_MS,
PIPELINE_TOPICS,
)
except Exception as alter_err:
error_safe = sanitize_log_args(alter_err)[0]
logger.warning(
"Could not set topic retention (non-fatal): %s", error_safe
)

except Exception as e:
error_safe = sanitize_log_args(e)[0]
logger.warning("Failed to auto-create Kafka topics: %s", error_safe)
Expand Down
22 changes: 1 addition & 21 deletions app/modules/meeting/ws_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async def egress_task() -> None:
# to this consumer and auto_offset_reset="latest" starts from the tail.
# A group-less consumer in AIOKafka requires explicit partition.assign()
# which is unreliable on a freshly-joined broker; a unique group is simpler.
group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}"
group_id = f"audio-egress-{room_code}-{user_id}-{time.time_ns()}"
consumer = AIOKafkaConsumer(
AUDIO_SYNTHESIZED,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
Expand All @@ -240,11 +240,6 @@ async def egress_task() -> None:
egress_ready.set() # Unblock ingest so ingest can still run
return

logger.info(
"Egress consumer ready. group=%s listening_language=%s",
sanitize_for_log(group_id),
sanitize_for_log(listening_language),
)
egress_ready.set() # Signal that we are ready to receive

# Track the highest sequence seen to drop stale frames arriving out-of-order
Expand All @@ -256,15 +251,6 @@ async def egress_task() -> None:
event = SynthesizedAudioEvent.model_validate(msg.value)
payload = event.payload

logger.info(
"Egress received: room=%s target_lang=%s"
" listening_lang=%s seq=%d",
sanitize_for_log(payload.room_id),
sanitize_for_log(payload.target_language),
sanitize_for_log(listening_language),
sanitize_for_log(payload.sequence_number),
)

# Filter by Room
if payload.room_id != room_code:
continue
Expand Down Expand Up @@ -306,12 +292,6 @@ async def egress_task() -> None:
audio_bytes = base64.b64decode(payload.audio_data)
try:
await websocket.send_bytes(audio_bytes)
logger.info(
"Egress: sent %d bytes to user=%s (seq=%d)",
len(audio_bytes),
sanitize_for_log(user_id),
sanitize_for_log(payload.sequence_number),
)
except Exception as send_err:
logger.warning(
"Egress: WebSocket send failed for user=%s: %s",
Expand Down
5 changes: 5 additions & 0 deletions app/services/stt_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class STTWorker(BaseConsumer):
group_id = "stt-worker-group"
event_schema = AudioChunkEvent

# Skip audio chunks older than 2 minutes — they belong to sessions whose
# room IDs no longer exist in Redis, so the translation worker would find
# no participants and produce nothing anyway.
max_message_age_ms = 120_000 # 2 minutes

async def handle(self, event: BaseEvent[Any]) -> None:
"""Process a single audio chunk: decode → STT → publish transcript.

Expand Down
1 change: 1 addition & 0 deletions app/services/translation_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class TranslationWorker(BaseConsumer):
topic = TEXT_ORIGINAL
group_id = "translation-worker-group"
event_schema = TranscriptionEvent
max_message_age_ms = 120_000 # skip transcriptions from dead sessions

def __init__(self, producer: object) -> None:
super().__init__(producer=producer)
Expand Down
1 change: 1 addition & 0 deletions app/services/tts_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class TTSWorker(BaseConsumer):
topic = TEXT_TRANSLATED
group_id = "tts-worker-group"
event_schema = TranslationEvent
max_message_age_ms = 120_000 # skip translations from dead sessions

async def handle(self, event: BaseEvent[Any]) -> None:
"""Process a translation: synthesize audio → publish.
Expand Down
Loading