diff --git a/app/core/config.py b/app/core/config.py index edd8d04..7884724 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -119,10 +119,10 @@ class Settings(BaseSettings): FRONTEND_BASE_URL: str = "http://localhost:4200" # CORS - CORS_ORIGINS: list[str] = [ - "http://localhost:4200", - "https://spoken-frontend.onrender.com", - ] + @property + def CORS_ORIGINS(self) -> list[str]: + origins = [self.FRONTEND_BASE_URL.rstrip("/")] if self.FRONTEND_BASE_URL else [] + return [o for o in origins if o] model_config = SettingsConfigDict( env_file=".env", case_sensitive=True, extra="ignore" diff --git a/app/db/session.py b/app/db/session.py index dbe86ec..778188f 100644 --- a/app/db/session.py +++ b/app/db/session.py @@ -70,7 +70,10 @@ def get_engine() -> Engine: # pool_recycle handles server-side idle timeouts (Supabase/DigitalOcean). connect_args = {} if DATABASE_URL.startswith("postgresql"): - connect_args["sslmode"] = "require" + if "localhost" in DATABASE_URL: + connect_args["sslmode"] = "disable" + else: + connect_args["sslmode"] = "require" try: cached_engine = create_engine( diff --git a/app/external_services/deepgram/service.py b/app/external_services/deepgram/service.py index bbcca18..0b333a5 100644 --- a/app/external_services/deepgram/service.py +++ b/app/external_services/deepgram/service.py @@ -29,6 +29,13 @@ class DeepgramSTTService: def __init__(self, timeout: float = 10.0) -> None: self._timeout = timeout + self._client: httpx.AsyncClient | None = None + + @property + def client(self) -> httpx.AsyncClient: + if self._client is None or self._client.is_closed: + self._client = httpx.AsyncClient(timeout=self._timeout) + return self._client async def transcribe( self, @@ -63,14 +70,13 @@ async def transcribe( } start = time.monotonic() - async with httpx.AsyncClient(timeout=self._timeout) as client: - response = await client.post( - settings.DEEPGRAM_API_URL, - headers=headers, - params=params, - content=audio_bytes, - ) - response.raise_for_status() + response = await self.client.post( + settings.DEEPGRAM_API_URL, + headers=headers, + params=params, + content=audio_bytes, + ) + response.raise_for_status() elapsed_ms = (time.monotonic() - start) * 1000 logger.debug("Deepgram STT completed in %.1fms", elapsed_ms) diff --git a/app/external_services/deepl/service.py b/app/external_services/deepl/service.py index 22466b9..18e78a9 100644 --- a/app/external_services/deepl/service.py +++ b/app/external_services/deepl/service.py @@ -55,6 +55,13 @@ class DeepLTranslationService: def __init__(self, timeout: float = 10.0) -> None: self._timeout = timeout + self._client: httpx.AsyncClient | None = None + + @property + def client(self) -> httpx.AsyncClient: + if self._client is None or self._client.is_closed: + self._client = httpx.AsyncClient(timeout=self._timeout) + return self._client async def translate( self, @@ -88,13 +95,12 @@ async def translate( payload["source_lang"] = deepl_source start = time.monotonic() - async with httpx.AsyncClient(timeout=self._timeout) as client: - response = await client.post( - settings.DEEPL_API_URL, - headers=headers, - json=payload, - ) - response.raise_for_status() + response = await self.client.post( + settings.DEEPL_API_URL, + headers=headers, + json=payload, + ) + response.raise_for_status() elapsed_ms = (time.monotonic() - start) * 1000 logger.debug("DeepL translation completed in %.1fms", elapsed_ms) @@ -131,6 +137,13 @@ class OpenAITranslationFallback: def __init__(self, timeout: float = 15.0) -> None: self._timeout = timeout + self._client: httpx.AsyncClient | None = None + + @property + def client(self) -> httpx.AsyncClient: + if self._client is None or self._client.is_closed: + self._client = httpx.AsyncClient(timeout=self._timeout) + return self._client async def translate( self, @@ -176,13 +189,12 @@ async def translate( } start = time.monotonic() - async with httpx.AsyncClient(timeout=self._timeout) as client: - response = await client.post( - "https://api.openai.com/v1/chat/completions", - headers=headers, - json=payload, - ) - response.raise_for_status() + response = await self.client.post( + "https://api.openai.com/v1/chat/completions", + headers=headers, + json=payload, + ) + response.raise_for_status() elapsed_ms = (time.monotonic() - start) * 1000 logger.debug("OpenAI translation fallback completed in %.1fms", elapsed_ms) diff --git a/app/external_services/openai_tts/service.py b/app/external_services/openai_tts/service.py index da94b20..0a32a53 100644 --- a/app/external_services/openai_tts/service.py +++ b/app/external_services/openai_tts/service.py @@ -33,6 +33,13 @@ class OpenAITTSService: def __init__(self, timeout: float = 15.0) -> None: self._timeout = timeout + self._client: httpx.AsyncClient | None = None + + @property + def client(self) -> httpx.AsyncClient: + if self._client is None or self._client.is_closed: + self._client = httpx.AsyncClient(timeout=self._timeout) + return self._client async def synthesize( self, @@ -68,13 +75,12 @@ async def synthesize( } start = time.monotonic() - async with httpx.AsyncClient(timeout=self._timeout) as client: - response = await client.post( - settings.OPENAI_TTS_API_URL, - headers=headers, - json=payload, - ) - response.raise_for_status() + response = await self.client.post( + settings.OPENAI_TTS_API_URL, + headers=headers, + json=payload, + ) + response.raise_for_status() elapsed_ms = (time.monotonic() - start) * 1000 logger.debug("OpenAI TTS completed in %.1fms", elapsed_ms) diff --git a/app/external_services/voiceai/service.py b/app/external_services/voiceai/service.py index 9e3747e..04e5d4b 100644 --- a/app/external_services/voiceai/service.py +++ b/app/external_services/voiceai/service.py @@ -36,6 +36,13 @@ class VoiceAITTSService: def __init__(self, timeout: float = 60.0) -> None: self._timeout = timeout + self._client: httpx.AsyncClient | None = None + + @property + def client(self) -> httpx.AsyncClient: + if self._client is None or self._client.is_closed: + self._client = httpx.AsyncClient(timeout=self._timeout) + return self._client async def synthesize( self, @@ -83,22 +90,20 @@ async def synthesize( "temperature": 1, "top_p": 0.8, } - print(f"Voice.ai Audio format: {audio_format}") + logger.debug("Voice.ai Audio format: %s", audio_format) if voice_id: payload["voice_id"] = voice_id start = time.monotonic() - async with httpx.AsyncClient(timeout=self._timeout) as client: - response = await client.post( - settings.VOICEAI_TTS_API_URL, - headers=headers, - json=payload, - ) - response.raise_for_status() + response = await self.client.post( + settings.VOICEAI_TTS_API_URL, + headers=headers, + json=payload, + ) + response.raise_for_status() elapsed_ms = (time.monotonic() - start) * 1000 - print(f"Voice.ai TTS API completed in {elapsed_ms} ms") - logger.debug("Voice.ai TTS completed in %.1fms", elapsed_ms) + logger.debug("Voice.ai TTS API completed in %.1fms", elapsed_ms) return { "audio_bytes": response.content, diff --git a/app/kafka/consumer.py b/app/kafka/consumer.py index fef85ba..d8a7513 100644 --- a/app/kafka/consumer.py +++ b/app/kafka/consumer.py @@ -123,7 +123,7 @@ async def _consume_loop(self) -> None: age_ms = _time.time() * 1000 - msg.timestamp if age_ms > self.max_message_age_ms: topic_safe = sanitize_log_args(self.topic)[0] - logger.debug( + logger.info( "Skipping stale message on '%s' (age=%.0fms > limit=%dms)", topic_safe, age_ms, diff --git a/app/kafka/manager.py b/app/kafka/manager.py index 6690b05..ae10102 100644 --- a/app/kafka/manager.py +++ b/app/kafka/manager.py @@ -93,10 +93,19 @@ async def _init_topics(self) -> None: await admin_client.start() try: # DLQ topics for each required topic + standard topics + PIPELINE_TOPIC_SET = { + AUDIO_RAW, + AUDIO_SYNTHESIZED, + TEXT_ORIGINAL, + TEXT_TRANSLATED, + } new_topics = [] for topic in TOPICS_TO_CREATE: + partitions = 3 if topic in PIPELINE_TOPIC_SET else 1 new_topics.append( - NewTopic(name=topic, num_partitions=1, replication_factor=1) + NewTopic( + name=topic, num_partitions=partitions, replication_factor=1 + ) ) new_topics.append( NewTopic( diff --git a/app/modules/auth/constants.py b/app/modules/auth/constants.py index 3404378..f9f617f 100644 --- a/app/modules/auth/constants.py +++ b/app/modules/auth/constants.py @@ -15,3 +15,10 @@ class SupportedLanguage(enum.StrEnum): SPANISH = "es" ITALIAN = "it" PORTUGUESE = "pt" + DUTCH = "nl" + POLISH = "pl" + RUSSIAN = "ru" + JAPANESE = "ja" + CHINESE = "zh" + KOREAN = "ko" + TURKISH = "tr" diff --git a/app/modules/meeting/constants.py b/app/modules/meeting/constants.py index 7fafb54..c98485e 100644 --- a/app/modules/meeting/constants.py +++ b/app/modules/meeting/constants.py @@ -46,6 +46,40 @@ class InvitationStatus(enum.StrEnum): MSG_INVITATIONS_SENT = "Meeting invitations sent." +# ── Supported Languages ─────────────────────────────────────────────── +SUPPORTED_LANGUAGES: Final[set[str]] = { + "en", # English + "de", # German + "fr", # French + "es", # Spanish + "it", # Italian + "pt", # Portuguese + "nl", # Dutch + "pl", # Polish + "ru", # Russian + "ja", # Japanese + "zh", # Chinese + "ko", # Korean + "tr", # Turkish +} + +LANGUAGE_NAMES: Final[dict[str, str]] = { + "en": "English", + "de": "German", + "fr": "French", + "es": "Spanish", + "it": "Italian", + "pt": "Portuguese", + "nl": "Dutch", + "pl": "Polish", + "ru": "Russian", + "ja": "Japanese", + "zh": "Chinese", + "ko": "Korean", + "tr": "Turkish", +} + + # ── Redis Key Patterns ──────────────────────────────────────────────── def key_room_participants(room_code: str) -> str: return f"room:{room_code}:participants" diff --git a/app/modules/meeting/schemas.py b/app/modules/meeting/schemas.py index fa7a993..bfd6977 100644 --- a/app/modules/meeting/schemas.py +++ b/app/modules/meeting/schemas.py @@ -7,7 +7,9 @@ import uuid from datetime import datetime -from pydantic import BaseModel, ConfigDict, EmailStr, Field +from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator + +from app.modules.meeting.constants import SUPPORTED_LANGUAGES # ── Request schemas ─────────────────────────────────────────────────── @@ -62,6 +64,20 @@ class JoinRoomRequest(BaseModel): "Used for STT source language selection.", ) + @field_validator("listening_language", "speaking_language", mode="before") + @classmethod + def validate_language_code(cls, v: str | None) -> str | None: + """Ensure language codes are supported ISO 639-1 values.""" + if v is None: + return v + code = v.strip().lower() + if code not in SUPPORTED_LANGUAGES: + supported = ", ".join(sorted(SUPPORTED_LANGUAGES)) + raise ValueError( + f"Unsupported language '{code}'. Supported languages: {supported}" + ) + return code + # ── Response schemas ────────────────────────────────────────────────── diff --git a/app/modules/meeting/state.py b/app/modules/meeting/state.py index 30c9fe8..b9b95f0 100644 --- a/app/modules/meeting/state.py +++ b/app/modules/meeting/state.py @@ -1,7 +1,7 @@ """Meeting ephemeral Redis State Service module. -Generates atomic mapping tracking natively memory limits smoothly defining -targets natively. +Manages live meeting state (lobby, participants, active speaker) using Redis +for high-performance ephemeral storage. """ import json @@ -23,11 +23,10 @@ class MeetingStateService: - """Manages ephemeral live room state (lobby, participants presence, - active speaker) in Redis. + """Manages ephemeral live room state (lobby, participants, active speaker) + in Redis. - All operations are asynchronous and hit Redis directly smoothly handling - maps natively seamlessly. + All operations are asynchronous and interact with Redis directly. """ def __init__(self, redis_client: aioredis.Redis | None = None) -> None: @@ -48,16 +47,13 @@ async def add_participant( """Add or update a user's presence in the active room participants hash. Args: - room_code (str): Identity parameter dynamically natively resolving - identifiers. - user_id (str): User tracker string mapped locally natively limits - logically securely bindings natively. - language (str): Locale configuration gracefully array mapping. - hardware_ready (bool): Configuration map dynamically natively smoothly - correctly natively tracking gracefully gracefully locally securely - smoothly gracefully tracking natively handled array limit logically - seamlessly bounds dynamically safely correctly securely limits - correctly dynamically. + room_code: The room's unique identifier code. + user_id: The participant's unique identifier. + language: The participant's listening language (ISO 639-1). + speaking_language: The participant's speaking language (ISO 639-1). + hardware_ready: Whether the user's media devices are ready. + display_name: The participant's display name. + role: The participant's role (host, guest, participant). """ state = { "status": "connected", @@ -80,11 +76,8 @@ async def remove_participant(self, room_code: str, user_id: str) -> None: """Remove a user from the active participants hash. Args: - room_code (str): Identity string naturally resolving natively - gracefully limits seamlessly dynamically correctly safely mapping - dynamically. - user_id (str): Evaluator tracking string string parameter seamlessly - mapping efficiently limits. + room_code: The room's unique identifier code. + user_id: The participant's unique identifier. """ await cast( "Awaitable[Any]", diff --git a/app/modules/meeting/ws_dependencies.py b/app/modules/meeting/ws_dependencies.py index ff3bb4d..40bfe55 100644 --- a/app/modules/meeting/ws_dependencies.py +++ b/app/modules/meeting/ws_dependencies.py @@ -22,7 +22,7 @@ def authenticate_ws(token: str = Query(...), db: Session = Depends(get_db)) -> str: - """Validate the provided JWT token for a WebSocket connection natively correctly. + """Validate the provided JWT token for a WebSocket connection. Works for both Authenticated Users (who present an access token) and Guests (who present a guest token). @@ -73,8 +73,7 @@ def authenticate_ws(token: str = Query(...), db: Session = Depends(get_db)) -> s async def assert_room_participant(room_code: str, user_id: str) -> dict: - """Ensure the user has successfully joined the room mapping effectively - logically optimally accurately natively securely. + """Ensure the user has successfully joined the room mapping. Checks the Redis active participant list managed by MeetingStateService. If the user has not called POST /meetings/{room}/join, they cannot @@ -83,14 +82,10 @@ async def assert_room_participant(room_code: str, user_id: str) -> dict: Args: room_code (str): Video space tracking parameter tracking efficiently statically mapping accurately correctly logically structurally. - user_id (str): Authenticated marker mapped cleanly seamlessly efficiently - effectively dynamically dynamically effectively precisely safely - gracefully natively. + user_id (str): Authenticated marker mapped. Returns: - dict: The participant state dictionary gracefully smoothly mapping correctly - statically mappings effortlessly automatically intuitively organically - smoothly. + dict: The participant state dictionary. """ state_service = MeetingStateService() participants = await state_service.get_participants(room_code) diff --git a/app/modules/meeting/ws_router.py b/app/modules/meeting/ws_router.py index bd3f62c..698277c 100644 --- a/app/modules/meeting/ws_router.py +++ b/app/modules/meeting/ws_router.py @@ -1,7 +1,6 @@ """Meeting WebSockets Integrations module. -WebSocket endpoints for real-time signaling, audio streaming, and captions -seamlessly intelligently reliably. +WebSocket endpoints for real-time signaling, audio streaming, and captions. """ import asyncio @@ -10,12 +9,9 @@ import logging import time -from aiokafka import AIOKafkaConsumer # type: ignore[import-untyped] from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect -from app.core.config import settings from app.core.sanitize import log_sanitizer, sanitize_for_log -from app.kafka.topics import AUDIO_SYNTHESIZED, TEXT_ORIGINAL, TEXT_TRANSLATED from app.modules.meeting.state import MeetingStateService from app.modules.meeting.ws_dependencies import assert_room_participant, authenticate_ws from app.schemas.pipeline import ( @@ -35,17 +31,12 @@ async def signaling_websocket( room_code: str, user_id: str = Depends(authenticate_ws), ) -> None: - """Relays WebRTC Offer, Answer, and ICE Candidate messages between peers - naturally cleanly mappings logically confidently reliably elegantly optimally - successfully accurately efficiently correctly accurately dynamically smoothly - gracefully cleanly successfully reliably optimally cleanly successfully. + """Relays WebRTC Offer, Answer, and ICE Candidate messages between peers. Args: - websocket (WebSocket): Protocol mapping gracefully effectively gracefully - efficiently seamlessly cleanly natively efficiently intelligently. - room_code (str): Video URL param effectively efficiently dynamically - gracefully successfully locally. - user_id (str): Extracted authenticated bounds safely cleanly reliably smoothly. + websocket (WebSocket): Protocol mapping. + room_code (str): Video URL param. + user_id (str): Extracted authenticated bounds. """ try: participant_state = await assert_room_participant(room_code, user_id) @@ -121,19 +112,12 @@ async def audio_websocket( # noqa: C901 room_code: str, user_id: str = Depends(authenticate_ws), ) -> None: - """Bidirectional audio stream structurally confidently perfectly beautifully - intelligently flawlessly gracefully stably cleanly successfully robustly - gracefully optimally logically carefully successfully elegantly. + """Bidirectional audio stream. Args: - websocket (WebSocket): Protocol native tracker cleanly cleanly gracefully - elegantly perfectly beautifully accurately neatly effectively. - room_code (str): Room id safely neatly accurately intelligently seamlessly - properly carefully smoothly nicely smartly correctly beautifully safely - perfectly cleanly cleanly. - user_id (str): Authenticated limit string naturally cleanly neatly gracefully - intelligently smartly beautifully seamlessly safely correctly reliably - beautifully cleanly carefully. + websocket (WebSocket): Protocol native tracker. + room_code (str): Room id. + user_id (str): Authenticated limit string. """ try: participant_state = await assert_room_participant(room_code, user_id) @@ -184,8 +168,8 @@ async def ingest_task() -> None: room_id=room_code, user_id=user_id, audio_bytes=chunk, - # Use speaking_language (what the user speaks) — not - # listening_language (what they want to hear) so STT + # Use speaking_language — not + # listening_language so STT # transcribes in the correct language. source_language=participant_state.get( "speaking_language", "en" @@ -214,74 +198,55 @@ async def ingest_task() -> None: egress_ready = asyncio.Event() async def egress_task() -> None: - """Reads Kafka synthesized audio, filters for user, writes to WS.""" - # Use a unique group_id per connection so Kafka assigns all partitions - # to this consumer and auto_offset_reset="latest" starts from the tail. - # A group-less consumer in AIOKafka requires explicit partition.assign() - # which is unreliable on a freshly-joined broker; a unique group is simpler. - group_id = f"audio-egress-{room_code}-{user_id}-{time.time_ns()}" - consumer = AIOKafkaConsumer( - AUDIO_SYNTHESIZED, - bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, - group_id=group_id, - auto_offset_reset="latest", - value_deserializer=lambda v: json.loads(v.decode("utf-8")), - enable_auto_commit=True, - ) - try: - await consumer.start() - except Exception as start_err: - logger.error( - "Egress consumer failed to start for user=%s room=%s: %s", - sanitize_for_log(user_id), - sanitize_for_log(room_code), - start_err, - ) - egress_ready.set() # Unblock ingest so ingest can still run - return + """Reads Redis Pub/Sub synthesized audio, filters for user, writes to WS.""" + from app.modules.auth.token_store import _get_redis_client + + redis = _get_redis_client() + pubsub = redis.pubsub() + channel = f"pipeline:audio:{room_code}" + await pubsub.subscribe(channel) egress_ready.set() # Signal that we are ready to receive - # Track the highest sequence seen to drop stale frames arriving out-of-order + # Track the highest sequence seen to drop stale frames highest_seq: dict[str, int] = {} + # Cache participant count to avoid per-frame Redis lookups + _cached_participant_count: int = 0 + _cache_ts: float = 0.0 try: - async for msg in consumer: + async for message in pubsub.listen(): + if message["type"] != "message": + continue + try: - event = SynthesizedAudioEvent.model_validate(msg.value) + event = SynthesizedAudioEvent.model_validate( + json.loads(message["data"]) + ) payload = event.payload - # Filter by Room - if payload.room_id != room_code: - continue + # Room filter is implicit (we subscribed to room-specific channel) + + # Language filter with cached participant count + now = time.time() + if now - _cache_ts > 5.0: + participants = await MeetingStateService().get_participants( + room_code + ) + _cached_participant_count = len(participants) + _cache_ts = now - # Language filter: In production with multiple participants, - # only deliver audio matching the listener's language. - # For single-user testing, skip the filter so the speaker - # can hear their own translated audio. - participants = await MeetingStateService().get_participants( - room_code - ) if ( - len(participants) > 1 + _cached_participant_count > 1 and payload.target_language != listening_language ): - logger.debug( - "Egress: skipping lang mismatch target=%s != listening=%s", - sanitize_for_log(payload.target_language), - sanitize_for_log(listening_language), - ) continue - # Stale frame guard (drop if more than 10 sequences behind latest) + # Stale frame guard (drop if more than 10 sequences behind) speaker_key = payload.user_id current_highest = highest_seq.get(speaker_key, -1) if payload.sequence_number < current_highest - 10: - logger.debug( - "Dropped stale audio frame from %s", - sanitize_for_log(speaker_key), - ) continue highest_seq[speaker_key] = max( @@ -298,12 +263,15 @@ async def egress_task() -> None: sanitize_for_log(user_id), send_err, ) + break except Exception as frame_err: logger.exception("Error processing egress frame: %s", frame_err) + except asyncio.CancelledError: + pass finally: - await consumer.stop() + await pubsub.unsubscribe(channel) async def guarded_ingest_task() -> None: """Wait for egress consumer to be ready, then start ingesting.""" @@ -332,9 +300,8 @@ async def captions_websocket( room_code: str, user_id: str = Depends(authenticate_ws), ) -> None: - """Broadcasts original and translated transcription events.""" + """Broadcasts original and translated transcription events via Redis Pub/Sub.""" try: - # Validate they are in the room, but we don't strictly *need* their state _ = await assert_room_participant(room_code, user_id) except Exception as e: await websocket.close(code=1008, reason=str(e)) @@ -342,47 +309,28 @@ async def captions_websocket( await websocket.accept() - # Use a persistent user-specific group so reconnects don't drop captions - # Note: "Subscribe from now" is handled via auto_offset_reset="latest" - # in their group creation or by wiping the group offsets. - # We'll use a dynamic timestamp group to force "latest". - consumer = AIOKafkaConsumer( - TEXT_ORIGINAL, - TEXT_TRANSLATED, - bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, - group_id=f"captions-{room_code}-{user_id}-{int(time.time())}", - auto_offset_reset="latest", - value_deserializer=lambda v: json.loads(v.decode("utf-8")), - ) + from app.modules.auth.token_store import _get_redis_client - await consumer.start() + redis = _get_redis_client() + pubsub = redis.pubsub() + channel = f"pipeline:captions:{room_code}" + await pubsub.subscribe(channel) try: - async for msg in consumer: - payload_data = msg.value.get("payload", {}) - if payload_data.get("room_id") != room_code: + async for message in pubsub.listen(): + if message["type"] != "message": continue - # Build unified caption response depending on topic - is_translation = msg.topic == TEXT_TRANSLATED - - caption_msg = { - "event": "caption", - "speaker_id": payload_data.get("user_id"), - "is_final": payload_data.get("is_final", True), - "timestamp_ms": int(time.time() * 1000), - } - - if is_translation: - caption_msg["language"] = payload_data.get("target_language") - caption_msg["text"] = payload_data.get("translated_text") - else: - caption_msg["language"] = payload_data.get("source_language") - caption_msg["text"] = payload_data.get("text") - - await websocket.send_json(caption_msg) + try: + caption_data = json.loads(message["data"]) + # Only forward captions for this room (implicit via channel) + await websocket.send_json(caption_data) + except Exception as frame_err: + logger.warning("Error processing caption frame: %s", frame_err) except WebSocketDisconnect: pass + except asyncio.CancelledError: + pass finally: - await consumer.stop() + await pubsub.unsubscribe(channel) diff --git a/app/schemas/pipeline.py b/app/schemas/pipeline.py index d323eef..33a40a8 100644 --- a/app/schemas/pipeline.py +++ b/app/schemas/pipeline.py @@ -37,14 +37,13 @@ class AudioChunkPayload(BaseModel): """Payload for a single audio chunk from a WebSocket client. Attributes: - room_id: Room the audio originates from securely mapped. + room_id: Room the audio originates from. user_id: Speaker's tracking ID (user UUID or guest session UUID). sequence_number: Monotonically increasing chunk index. - audio_data: Base64-encoded raw audio bytes manually structured natively - smoothly. - sample_rate: Audio sample rate natively mapping efficiently. - encoding: Audio encoding format mapped explicitly. - source_language: Speaker's language reliably securely nicely comfortably. + audio_data: Base64-encoded raw audio bytes. + sample_rate: Audio sample rate in Hz. + encoding: Audio encoding format. + source_language: Speaker's language code (ISO 639-1). """ room_id: str = Field(..., description="Room the audio originates from.") @@ -68,7 +67,7 @@ class AudioChunkEvent(BaseEvent[AudioChunkPayload]): """Kafka event wrapping a raw audio chunk for the STT stage. Attributes: - event_type: String constant resolving seamlessly logically statically. + event_type: Kafka event type identifier for audio chunks. """ event_type: str = "audio.chunk" @@ -81,15 +80,13 @@ class TranscriptionPayload(BaseModel): """Payload produced by the STT worker. Attributes: - room_id: Active tracker explicitly identifying organically flawlessly - dynamically mapped. - user_id: Connected speaker logically securely confidently dependably - smoothly. - sequence_number: Ordered limit elegantly flawlessly appropriately stably. - text: Transcribed result mapped automatically perfectly. + room_id: Room the transcription belongs to. + user_id: Speaker who produced the audio. + sequence_number: Ordered chunk index from the source audio. + text: Transcribed text from the audio chunk. source_language: Detected or declared source language. - is_final: Check bounds effectively naturally flawlessly. - confidence: Float organically cleanly cleanly successfully. + is_final: Whether this is a final transcription or an interim result. + confidence: STT confidence score (0.0 to 1.0). """ room_id: str @@ -124,19 +121,13 @@ class TranslationPayload(BaseModel): """Payload produced by the Translation worker. Attributes: - room_id: Active room identifier for the translation. - user_id: Participant rationally fluently suitably rationally cleanly - explicitly cleanly organically successfully realistically correctly - properly. - sequence_number: Stream limit intelligently cleanly comfortably naturally - effectively perfectly. - original_text: Initial text before translation. - translated_text: Resulting text after translation. - source_language: Identity rationally predictably optimally accurately - effortlessly structurally accurately elegantly optimally intelligently - fluently. - target_language: Target effectively elegantly successfully mapping - efficiently flawlessly seamlessly cleanly correctly securely accurately. + room_id: Room the translation belongs to. + user_id: Original speaker's tracking ID. + sequence_number: Ordered chunk index from the source transcription. + original_text: Text before translation. + translated_text: Text after translation. + source_language: ISO 639-1 code of the original language. + target_language: ISO 639-1 code of the translation target. """ room_id: str diff --git a/app/services/audio_bridge.py b/app/services/audio_bridge.py index 9a33fbd..d94c4b4 100644 --- a/app/services/audio_bridge.py +++ b/app/services/audio_bridge.py @@ -94,11 +94,13 @@ async def publish_audio_chunk( kafka = get_kafka_manager() await kafka.producer.send(AUDIO_RAW, event, key=room_id) - logger.debug( - "Published audio chunk seq=%d for user=%s in room=%s", + logger.info( + "INGEST: seq=%d room=%s user=%s lang=%s size=%d bytes", seq, - log_sanitizer.sanitize(user_id), log_sanitizer.sanitize(room_id), + log_sanitizer.sanitize(user_id), + source_language, + len(audio_bytes), ) diff --git a/app/services/stt_worker.py b/app/services/stt_worker.py index e9a4345..08aa304 100644 --- a/app/services/stt_worker.py +++ b/app/services/stt_worker.py @@ -38,13 +38,24 @@ class STTWorker(BaseConsumer): group_id = "stt-worker-group" event_schema = AudioChunkEvent + # Buffer configuration + BUFFER_SIZE = 5 # Number of 100ms chunks to buffer (500ms total) + + def __init__(self, producer: Any) -> None: + super().__init__(producer) + # Store buffers per user in room: { "room:user": [chunk1, chunk2, ...] } + self._audio_buffers: dict[str, list[bytes]] = {} + self._buffer_timestamps: dict[str, float] = {} + # Skip audio chunks older than 2 minutes — they belong to sessions whose # room IDs no longer exist in Redis, so the translation worker would find # no participants and produce nothing anyway. max_message_age_ms = 120_000 # 2 minutes async def handle(self, event: BaseEvent[Any]) -> None: - """Process a single audio chunk: decode → STT → publish transcript. + """Process a single audio chunk with buffering. + + Collect → decode → STT → publish. Args: event (BaseEvent[Any]): The deserialized wrapper containing the @@ -55,17 +66,29 @@ async def handle(self, event: BaseEvent[Any]) -> None: pipeline_start = time.monotonic() - # 1. Decode base64 audio + # 1. Decode and Buffer audio_bytes = base64.b64decode(payload.audio_data) - if not audio_bytes: - logger.warning( - "Empty audio chunk seq=%d from user=%s, skipping", - payload.sequence_number, - payload.user_id, - ) return + buffer_key = f"{payload.room_id}:{payload.user_id}" + if buffer_key not in self._audio_buffers: + self._audio_buffers[buffer_key] = [] + + self._audio_buffers[buffer_key].append(audio_bytes) + self._buffer_timestamps[buffer_key] = time.monotonic() + + # Periodically sweep stale buffers (older than 60 seconds) + self._sweep_stale_buffers() + + # Only proceed if we have enough chunks to make transcription viable + if len(self._audio_buffers[buffer_key]) < self.BUFFER_SIZE: + return + + # Concatenate buffered chunks + full_audio = b"".join(self._audio_buffers[buffer_key]) + self._audio_buffers[buffer_key] = [] # Clear buffer for next cycle + # 2. Call Deepgram STT (or Mock it if no API Key provided) from app.core.config import settings @@ -81,7 +104,7 @@ async def handle(self, event: BaseEvent[Any]) -> None: else: stt_service = get_deepgram_stt_service() result = await stt_service.transcribe( - audio_bytes, + full_audio, language=payload.source_language, sample_rate=payload.sample_rate, encoding=payload.encoding.value, @@ -89,11 +112,7 @@ async def handle(self, event: BaseEvent[Any]) -> None: text = result.get("text", "").strip() if not text: - logger.debug( - "No speech detected in chunk seq=%d from user=%s", - payload.sequence_number, - payload.user_id, - ) + # If still no text after 500ms, it's likely just background noise/silence return # 3. Build and publish transcription event @@ -128,12 +147,34 @@ async def handle(self, event: BaseEvent[Any]) -> None: }, ) ) - # Fix RUF006: Store a reference to the task to avoid garbage collection self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) except Exception as e: logger.error("Failed to broadcast active speaker: %s", e) + # Publish transcription caption to Redis Pub/Sub for real-time delivery + try: + import json as _json + + from app.modules.auth.token_store import _get_redis_client + + redis = _get_redis_client() + caption_msg = { + "event": "caption", + "speaker_id": payload.user_id, + "text": text, + "language": transcription_payload.source_language, + "is_final": True, + "is_translation": False, + "timestamp_ms": int(time.time() * 1000), + } + await redis.publish( + f"pipeline:captions:{payload.room_id}", + _json.dumps(caption_msg), + ) + except Exception as redis_err: + logger.warning("Redis caption publish failed: %s", redis_err) + # 4. Log pipeline latency elapsed_ms = (time.monotonic() - pipeline_start) * 1000 logger.info( @@ -141,7 +182,19 @@ async def handle(self, event: BaseEvent[Any]) -> None: payload.sequence_number, payload.room_id, payload.user_id, - text[:50], + text, result.get("confidence", 0.0), elapsed_ms, ) + + def _sweep_stale_buffers(self) -> None: + """Remove audio buffers that haven't received new chunks in 60 seconds.""" + now = time.monotonic() + stale_keys = [ + key for key, ts in self._buffer_timestamps.items() if now - ts > 60.0 + ] + for key in stale_keys: + self._audio_buffers.pop(key, None) + self._buffer_timestamps.pop(key, None) + if stale_keys: + logger.debug("Swept %d stale audio buffer(s)", len(stale_keys)) diff --git a/app/services/translation_worker.py b/app/services/translation_worker.py index 88702fa..252f67f 100644 --- a/app/services/translation_worker.py +++ b/app/services/translation_worker.py @@ -6,6 +6,7 @@ target language to ``text.translated``. """ +import asyncio import logging import time from typing import Any @@ -67,10 +68,10 @@ async def handle(self, event: BaseEvent[Any]) -> None: # 1. Determine target languages from room participants participants = await self._state.get_participants(payload.room_id) - target_languages = { - state.get("language", "en") - for state in participants.values() - if state.get("language", "en") != payload.source_language + target_languages: set[str] = { + str(participant_state.get("language", "en")) + for participant_state in participants.values() + if str(participant_state.get("language", "en")) != payload.source_language } if not target_languages: @@ -81,54 +82,87 @@ async def handle(self, event: BaseEvent[Any]) -> None: ) return - # 2. Translate for each target language - for target_lang in target_languages: + # 2. Translate concurrently for all target languages + async def _translate_one( + target_lang: str, + ) -> tuple[str, str | None, Exception | None]: + """Translate to a single language, returning (lang, text, error).""" try: - translated_text = await self._translate_text( + translated = await self._translate_text( payload.text, source_language=payload.source_language, target_language=target_lang, ) + return target_lang, translated, None + except Exception as exc: + return target_lang, None, exc - if not translated_text: - logger.warning( - "Empty translation for seq=%d target=%s", - payload.sequence_number, - target_lang, - ) - continue - - # 3. Publish translation event - translation_payload = TranslationPayload( - room_id=payload.room_id, - user_id=payload.user_id, - sequence_number=payload.sequence_number, - original_text=payload.text, - translated_text=translated_text, - source_language=payload.source_language, - target_language=target_lang, - ) - translation_event = TranslationEvent(payload=translation_payload) - - await self._producer.send( - TEXT_TRANSLATED, translation_event, key=payload.room_id - ) + results = await asyncio.gather( + *[_translate_one(lang) for lang in target_languages] + ) - logger.debug( - "Translation: seq=%d %s→%s text='%s'", + # 3. Publish successful translations, collect failures + failures: list[tuple[str, Exception]] = [] + for target_lang, translated_text, error in results: + if error is not None: + logger.error( + "Translation failed for seq=%d target=%s: %s", payload.sequence_number, - payload.source_language, target_lang, - translated_text[:50], + error, ) + failures.append((target_lang, error)) + continue - except Exception: - logger.exception( - "Translation failed for seq=%d target=%s", + if not translated_text: + logger.warning( + "Empty translation for seq=%d target=%s", payload.sequence_number, target_lang, ) - raise + continue + + translation_payload = TranslationPayload( + room_id=payload.room_id, + user_id=payload.user_id, + sequence_number=payload.sequence_number, + original_text=payload.text, + translated_text=translated_text, + source_language=payload.source_language, + target_language=target_lang, + ) + translation_event = TranslationEvent(payload=translation_payload) + + await self._producer.send( + TEXT_TRANSLATED, translation_event, key=payload.room_id + ) + + # Publish caption event to Redis Pub/Sub for real-time delivery + try: + await self._publish_caption_to_redis( + room_id=payload.room_id, + speaker_id=payload.user_id, + text=translated_text, + language=target_lang, + is_translation=True, + ) + except Exception as redis_err: + logger.warning("Redis caption publish failed: %s", redis_err) + + logger.debug( + "Translation: seq=%d %s→%s text='%s'", + payload.sequence_number, + payload.source_language, + target_lang, + translated_text[:50], + ) + + # If any translations failed, raise to trigger retry for the whole batch + if failures: + failed_langs = [lang for lang, _ in failures] + raise RuntimeError( + f"Translation failed for {len(failures)} language(s): {failed_langs}" + ) elapsed_ms = (time.monotonic() - pipeline_start) * 1000 logger.info( @@ -192,3 +226,29 @@ async def _translate_text( return f"[Mocked Translation -> {target_language}]: {text}" return str(result.get("translated_text", "")) + + async def _publish_caption_to_redis( + self, + *, + room_id: str, + speaker_id: str, + text: str, + language: str, + is_translation: bool, + ) -> None: + """Publish a caption event to Redis Pub/Sub for real-time WebSocket delivery.""" + import json + + from app.modules.auth.token_store import _get_redis_client + + redis = _get_redis_client() + caption_msg = { + "event": "caption", + "speaker_id": speaker_id, + "text": text, + "language": language, + "is_final": True, + "is_translation": is_translation, + "timestamp_ms": int(time.time() * 1000), + } + await redis.publish(f"pipeline:captions:{room_id}", json.dumps(caption_msg)) diff --git a/app/services/tts_worker.py b/app/services/tts_worker.py index 29d0382..edf26ed 100644 --- a/app/services/tts_worker.py +++ b/app/services/tts_worker.py @@ -97,6 +97,12 @@ async def handle(self, event: BaseEvent[Any]) -> None: await self._producer.send(AUDIO_SYNTHESIZED, synth_event, key=payload.room_id) + # Publish to Redis Pub/Sub for real-time WebSocket egress delivery + try: + await self._publish_audio_to_redis(synth_event) + except Exception as redis_err: + logger.warning("Redis audio egress publish failed: %s", redis_err) + # 4. Log pipeline latency elapsed_ms = (time.monotonic() - pipeline_start) * 1000 logger.info( @@ -130,3 +136,15 @@ async def _synthesize(self, *, text: str, language: str, encoding: str) -> dict: # Default: OpenAI return await get_openai_tts_service().synthesize(text, encoding=encoding) + + async def _publish_audio_to_redis(self, synth_event: SynthesizedAudioEvent) -> None: + """Publish synthesized audio to Redis Pub/Sub for WebSocket egress.""" + import json + + from app.modules.auth.token_store import _get_redis_client + + redis = _get_redis_client() + await redis.publish( + f"pipeline:audio:{synth_event.payload.room_id}", + json.dumps(synth_event.model_dump(), default=str), + ) diff --git a/tests/meeting/test_ws_router.py b/tests/meeting/test_ws_router.py index 6a18ed8..98f7dbd 100644 --- a/tests/meeting/test_ws_router.py +++ b/tests/meeting/test_ws_router.py @@ -51,13 +51,28 @@ def mock_audio_ingest(): @pytest.fixture -def mock_kafka_consumer(): - with patch("app.modules.meeting.ws_router.AIOKafkaConsumer") as mock_consumer_class: - consumer = AsyncMock() - consumer.start = AsyncMock() - consumer.stop = AsyncMock() - mock_consumer_class.return_value = consumer - yield consumer +def mock_redis_client(): + with patch("app.modules.auth.token_store._get_redis_client") as mock_get_redis: + redis_mock = MagicMock() + pubsub_mock = MagicMock() + + async def mock_subscribe(*args, **kwargs): + pass + + async def mock_unsubscribe(*args, **kwargs): + pass + + async def mock_listen(): + if False: + yield + + pubsub_mock.subscribe = mock_subscribe + pubsub_mock.unsubscribe = mock_unsubscribe + pubsub_mock.listen = mock_listen + + redis_mock.pubsub.return_value = pubsub_mock + mock_get_redis.return_value = redis_mock + yield redis_mock @pytest.fixture @@ -98,18 +113,10 @@ def test_signaling_websocket(mock_connection_manager): ) -@pytest.mark.usefixtures("mock_room_participant") +@pytest.mark.usefixtures("mock_room_participant", "mock_redis_client") def test_audio_websocket_ingest( mock_audio_ingest, - mock_kafka_consumer, ): - # Mock __aiter__ to be an async generator - async def mock_aiter(): - if False: - yield - - mock_kafka_consumer.__aiter__.side_effect = mock_aiter - with client.websocket_connect( "/api/v1/ws/audio/room1?token=mock_token" ) as websocket: diff --git a/tests/test_core/__init__.py b/tests/test_core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_kafka/__init__.py b/tests/test_kafka/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_kafka/test_pipeline.py b/tests/test_kafka/test_pipeline.py index f5f52fb..e3bbd79 100644 --- a/tests/test_kafka/test_pipeline.py +++ b/tests/test_kafka/test_pipeline.py @@ -73,6 +73,8 @@ async def test_stt_worker_handle(mock_producer, base_audio_chunk_event): with ( patch("app.services.stt_worker.get_deepgram_stt_service") as mock_get_stt, patch("app.core.config.settings") as mock_settings, + patch("app.services.connection_manager.get_connection_manager") as mock_get_cm, + patch("app.modules.auth.token_store._get_redis_client") as mock_get_redis, ): mock_settings.DEEPGRAM_API_KEY = "fake-key" @@ -84,10 +86,19 @@ async def test_stt_worker_handle(mock_producer, base_audio_chunk_event): } mock_get_stt.return_value = mock_stt_svc - await worker.handle(base_audio_chunk_event) + redis_mock = MagicMock() + redis_mock.publish = AsyncMock() + mock_get_redis.return_value = redis_mock + + cm_mock = MagicMock() + cm_mock.broadcast_to_room = AsyncMock() + mock_get_cm.return_value = cm_mock + + for _ in range(STTWorker.BUFFER_SIZE): + await worker.handle(base_audio_chunk_event) mock_stt_svc.transcribe.assert_called_once_with( - b"fake_audio", + b"fake_audio" * STTWorker.BUFFER_SIZE, language="en", sample_rate=16000, encoding="linear16", @@ -113,6 +124,7 @@ async def test_translation_worker_handle(mock_producer, base_transcription_event ) as mock_get_deepl, patch("app.services.translation_worker.get_openai_translation_fallback"), patch("app.core.config.settings") as mock_settings, + patch("app.modules.auth.token_store._get_redis_client") as mock_get_redis, ): mock_settings.DEEPL_API_KEY = "fake-deepl-key" mock_settings.OPENAI_API_KEY = "fake-openai-key" @@ -127,7 +139,7 @@ async def test_translation_worker_handle(mock_producer, base_transcription_event worker._state = mock_state mock_deepl = AsyncMock() - mock_deepl.supports_language.return_value = True + mock_deepl.supports_language = MagicMock(return_value=True) mock_deepl.translate.side_effect = ( lambda _text, _source_language, target_language: { "translated_text": f"Transl => {target_language}", @@ -136,6 +148,10 @@ async def test_translation_worker_handle(mock_producer, base_transcription_event ) mock_get_deepl.return_value = mock_deepl + redis_mock = MagicMock() + redis_mock.publish = AsyncMock() + mock_get_redis.return_value = redis_mock + await worker.handle(base_transcription_event) # Should translate twice (once for FR, once for ES) @@ -162,6 +178,7 @@ async def test_tts_worker_handle(mock_producer, base_translation_event): with ( patch("app.services.tts_worker.get_openai_tts_service") as mock_get_openai, patch("app.services.tts_worker.settings") as mock_settings, + patch("app.modules.auth.token_store._get_redis_client") as mock_get_redis, ): mock_settings.ACTIVE_TTS_PROVIDER = "openai" mock_settings.PIPELINE_AUDIO_ENCODING = "linear16" @@ -173,6 +190,10 @@ async def test_tts_worker_handle(mock_producer, base_translation_event): } mock_get_openai.return_value = mock_openai + redis_mock = MagicMock() + redis_mock.publish = AsyncMock() + mock_get_redis.return_value = redis_mock + await worker.handle(base_translation_event) mock_openai.synthesize.assert_called_once_with(