diff --git a/app/kafka/consumer.py b/app/kafka/consumer.py index 2fb187b..fef85ba 100644 --- a/app/kafka/consumer.py +++ b/app/kafka/consumer.py @@ -40,6 +40,11 @@ class BaseConsumer(abc.ABC): group_id: str event_schema: type[BaseEvent[Any]] + # Subclasses can set this to skip messages older than N ms. + # None = no staleness filter (default). Useful for real-time + # audio workers to discard backlogged chunks from dead sessions. + max_message_age_ms: int | None = None + # Declared here so Mypy can track it on the class body _initialized: bool = False @@ -108,6 +113,25 @@ async def _consume_loop(self) -> None: if not self._running: break + # Staleness guard: skip (and commit past) messages that are + # older than `max_message_age_ms`. This prevents workers from + # processing large backlogs of audio from dead sessions whose + # room IDs no longer exist in Redis. + if self.max_message_age_ms is not None: + import time as _time + + age_ms = _time.time() * 1000 - msg.timestamp + if age_ms > self.max_message_age_ms: + topic_safe = sanitize_log_args(self.topic)[0] + logger.debug( + "Skipping stale message on '%s' (age=%.0fms > limit=%dms)", + topic_safe, + age_ms, + self.max_message_age_ms, + ) + await self._consumer.commit() + continue + try: event = self.event_schema.model_validate(msg.value) await self._process_with_retry(event) diff --git a/app/kafka/manager.py b/app/kafka/manager.py index bad7fec..6690b05 100644 --- a/app/kafka/manager.py +++ b/app/kafka/manager.py @@ -73,13 +73,19 @@ def register_consumer(self, consumer: BaseConsumer) -> None: logger.info("Registered consumer for topic: '%s'", topic_safe) async def _init_topics(self) -> None: - """Create required topics if they don't exist.""" + """Create required topics if they don't exist, then enforce retention.""" from aiokafka.admin import ( # type: ignore[import-untyped] AIOKafkaAdminClient, NewTopic, ) - from app.kafka.topics import TOPICS_TO_CREATE + from app.kafka.topics import ( + AUDIO_RAW, + AUDIO_SYNTHESIZED, + TEXT_ORIGINAL, + TEXT_TRANSLATED, + TOPICS_TO_CREATE, + ) admin_client = AIOKafkaAdminClient( bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS @@ -108,6 +114,42 @@ async def _init_topics(self) -> None: topic_names = [t.name for t in topics_to_create_metadata] logger.info("Creating missing Kafka topics: %s", topic_names) await admin_client.create_topics(topics_to_create_metadata) + + # --- Enforce short retention on real-time pipeline topics --- + # 5 minutes is more than enough for any active session; anything + # older is from a dead session and should be discarded automatically. + # This works whether the topic was just created or already existed. + PIPELINE_TOPICS = [ + AUDIO_RAW, + AUDIO_SYNTHESIZED, + TEXT_ORIGINAL, + TEXT_TRANSLATED, + ] + RETENTION_MS = "300000" # 5 minutes + + try: + from aiokafka.admin import ConfigResource + + config_resources = [ + ConfigResource( + resource_type="topic", + name=topic, + configs={"retention.ms": RETENTION_MS}, + ) + for topic in PIPELINE_TOPICS + ] + await admin_client.alter_configs(config_resources) + logger.info( + "Set retention.ms=%s on pipeline topics: %s", + RETENTION_MS, + PIPELINE_TOPICS, + ) + except Exception as alter_err: + error_safe = sanitize_log_args(alter_err)[0] + logger.warning( + "Could not set topic retention (non-fatal): %s", error_safe + ) + except Exception as e: error_safe = sanitize_log_args(e)[0] logger.warning("Failed to auto-create Kafka topics: %s", error_safe) diff --git a/app/modules/meeting/ws_router.py b/app/modules/meeting/ws_router.py index 889f464..bd3f62c 100644 --- a/app/modules/meeting/ws_router.py +++ b/app/modules/meeting/ws_router.py @@ -219,7 +219,7 @@ async def egress_task() -> None: # to this consumer and auto_offset_reset="latest" starts from the tail. # A group-less consumer in AIOKafka requires explicit partition.assign() # which is unreliable on a freshly-joined broker; a unique group is simpler. - group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}" + group_id = f"audio-egress-{room_code}-{user_id}-{time.time_ns()}" consumer = AIOKafkaConsumer( AUDIO_SYNTHESIZED, bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, @@ -240,11 +240,6 @@ async def egress_task() -> None: egress_ready.set() # Unblock ingest so ingest can still run return - logger.info( - "Egress consumer ready. group=%s listening_language=%s", - sanitize_for_log(group_id), - sanitize_for_log(listening_language), - ) egress_ready.set() # Signal that we are ready to receive # Track the highest sequence seen to drop stale frames arriving out-of-order @@ -256,15 +251,6 @@ async def egress_task() -> None: event = SynthesizedAudioEvent.model_validate(msg.value) payload = event.payload - logger.info( - "Egress received: room=%s target_lang=%s" - " listening_lang=%s seq=%d", - sanitize_for_log(payload.room_id), - sanitize_for_log(payload.target_language), - sanitize_for_log(listening_language), - sanitize_for_log(payload.sequence_number), - ) - # Filter by Room if payload.room_id != room_code: continue @@ -306,12 +292,6 @@ async def egress_task() -> None: audio_bytes = base64.b64decode(payload.audio_data) try: await websocket.send_bytes(audio_bytes) - logger.info( - "Egress: sent %d bytes to user=%s (seq=%d)", - len(audio_bytes), - sanitize_for_log(user_id), - sanitize_for_log(payload.sequence_number), - ) except Exception as send_err: logger.warning( "Egress: WebSocket send failed for user=%s: %s", diff --git a/app/services/stt_worker.py b/app/services/stt_worker.py index 2c7cbd5..e9a4345 100644 --- a/app/services/stt_worker.py +++ b/app/services/stt_worker.py @@ -38,6 +38,11 @@ class STTWorker(BaseConsumer): group_id = "stt-worker-group" event_schema = AudioChunkEvent + # Skip audio chunks older than 2 minutes — they belong to sessions whose + # room IDs no longer exist in Redis, so the translation worker would find + # no participants and produce nothing anyway. + max_message_age_ms = 120_000 # 2 minutes + async def handle(self, event: BaseEvent[Any]) -> None: """Process a single audio chunk: decode → STT → publish transcript. diff --git a/app/services/translation_worker.py b/app/services/translation_worker.py index 243fd87..88702fa 100644 --- a/app/services/translation_worker.py +++ b/app/services/translation_worker.py @@ -43,6 +43,7 @@ class TranslationWorker(BaseConsumer): topic = TEXT_ORIGINAL group_id = "translation-worker-group" event_schema = TranscriptionEvent + max_message_age_ms = 120_000 # skip transcriptions from dead sessions def __init__(self, producer: object) -> None: super().__init__(producer=producer) diff --git a/app/services/tts_worker.py b/app/services/tts_worker.py index 503e5bb..29d0382 100644 --- a/app/services/tts_worker.py +++ b/app/services/tts_worker.py @@ -47,6 +47,7 @@ class TTSWorker(BaseConsumer): topic = TEXT_TRANSLATED group_id = "tts-worker-group" event_schema = TranslationEvent + max_message_age_ms = 120_000 # skip translations from dead sessions async def handle(self, event: BaseEvent[Any]) -> None: """Process a translation: synthesize audio → publish.