diff --git a/app/modules/meeting/service.py b/app/modules/meeting/service.py index 97a0f04..a754dae 100644 --- a/app/modules/meeting/service.py +++ b/app/modules/meeting/service.py @@ -285,6 +285,7 @@ async def _check_lobby_required( tracking_id: str, display_name: str, listening_language: str | None, + speaking_language: str | None, new_guest_token: str | None, live_pts: dict, ) -> dict | None: @@ -319,7 +320,20 @@ async def _check_lobby_required( else: final_lang = "en" - await self.state.add_to_lobby(room_code, tracking_id, display_name, final_lang) + if speaking_language: + final_speak_lang = speaking_language + elif user and user.speaking_language: + final_speak_lang = user.speaking_language + else: + final_speak_lang = "en" + + await self.state.add_to_lobby( + room_code, + tracking_id, + display_name, + final_lang, + speaking_language=final_speak_lang, + ) cm = get_connection_manager() await cm.broadcast_to_room( @@ -445,6 +459,7 @@ async def join_room( tracking_id=tracking_id, display_name=display_name, listening_language=listening_language, + speaking_language=speaking_language, new_guest_token=new_guest_token, live_pts=live_pts, ) diff --git a/app/modules/meeting/state.py b/app/modules/meeting/state.py index 1dd44db..30c9fe8 100644 --- a/app/modules/meeting/state.py +++ b/app/modules/meeting/state.py @@ -11,6 +11,7 @@ import redis.asyncio as aioredis +from app.core.sanitize import sanitize_for_log from app.modules.auth.token_store import _get_redis_client from app.modules.meeting.constants import ( key_room_active_speaker, @@ -106,12 +107,18 @@ async def get_participants(self, room_code: str) -> dict[str, dict]: # ── Lobby Set ──────────────────────────────────────────────────────── async def add_to_lobby( - self, room_code: str, user_id: str, display_name: str, language: str + self, + room_code: str, + user_id: str, + display_name: str, + language: str, + speaking_language: str = "en", ) -> None: """Place a user in the waiting room/lobby hash.""" state = { "display_name": display_name, "language": language, + "speaking_language": speaking_language, } await cast( "Awaitable[Any]", @@ -144,6 +151,7 @@ async def admit_from_lobby(self, room_code: str, user_id: str) -> bool: lobby_state = json.loads(lobby_data_raw) language = lobby_state.get("language", "en") + speaking_language = lobby_state.get("speaking_language", "en") display_name = lobby_state.get("display_name", "") # A lightweight transaction (pipeline) to ensure we don't have partial state @@ -153,6 +161,7 @@ async def admit_from_lobby(self, room_code: str, user_id: str) -> bool: state = { "status": "connected", "language": language, + "speaking_language": speaking_language, "hardware_ready": True, "display_name": display_name, "role": "guest", @@ -198,4 +207,6 @@ async def cleanup_room(self, room_code: str) -> None: ) if keys: await self._redis.delete(*keys) - logger.info("Cleaned up Redis state for room %s", room_code) + logger.info( + "Cleaned up Redis state for room %s", sanitize_for_log(room_code) + ) diff --git a/app/modules/meeting/ws_router.py b/app/modules/meeting/ws_router.py index c371a10..889f464 100644 --- a/app/modules/meeting/ws_router.py +++ b/app/modules/meeting/ws_router.py @@ -9,13 +9,12 @@ import json import logging import time -from pathlib import Path from aiokafka import AIOKafkaConsumer # type: ignore[import-untyped] from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect from app.core.config import settings -from app.core.sanitize import log_sanitizer +from app.core.sanitize import log_sanitizer, sanitize_for_log from app.kafka.topics import AUDIO_SYNTHESIZED, TEXT_ORIGINAL, TEXT_TRANSLATED from app.modules.meeting.state import MeetingStateService from app.modules.meeting.ws_dependencies import assert_room_participant, authenticate_ws @@ -144,7 +143,7 @@ async def audio_websocket( # noqa: C901 listening_language = participant_state.get("language", "en") await websocket.accept() - print("Audio WS client connected: %s", user_id) + logger.info("Audio WS client connected: %s", sanitize_for_log(user_id)) ingest_svc = get_audio_ingest_service() ingest_svc.reset_sequence(f"{room_code}:{user_id}") @@ -185,12 +184,17 @@ async def ingest_task() -> None: room_id=room_code, user_id=user_id, audio_bytes=chunk, - source_language=participant_state.get("language", "en"), + # Use speaking_language (what the user speaks) — not + # listening_language (what they want to hear) so STT + # transcribes in the correct language. + source_language=participant_state.get( + "speaking_language", "en" + ), ) except WebSocketDisconnect: logger.info( "Audio WS client disconnected (WebSocketDisconnect): %s", - log_sanitizer.sanitize(user_id), + sanitize_for_log(user_id), ) except RuntimeError as exc: # Starlette raises RuntimeError once the disconnect frame has been @@ -202,7 +206,7 @@ async def ingest_task() -> None: raise logger.info( "Audio WS ingest RuntimeError (socket already closed) for %s: %s", - log_sanitizer.sanitize(user_id), + sanitize_for_log(user_id), exc, ) @@ -211,34 +215,35 @@ async def ingest_task() -> None: async def egress_task() -> None: """Reads Kafka synthesized audio, filters for user, writes to WS.""" + # Use a unique group_id per connection so Kafka assigns all partitions + # to this consumer and auto_offset_reset="latest" starts from the tail. + # A group-less consumer in AIOKafka requires explicit partition.assign() + # which is unreliable on a freshly-joined broker; a unique group is simpler. + group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}" consumer = AIOKafkaConsumer( AUDIO_SYNTHESIZED, bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, - # No group_id → simple assign mode, avoids rebalance delays + group_id=group_id, auto_offset_reset="latest", value_deserializer=lambda v: json.loads(v.decode("utf-8")), - enable_auto_commit=False, + enable_auto_commit=True, ) - await consumer.start() - - # Force partition assignment by seeking to end - partitions = consumer.assignment() - if not partitions: - # Wait briefly for automatic assignment - await asyncio.sleep(1) - partitions = consumer.assignment() - for tp in partitions: - await consumer.seek_to_end(tp) + try: + await consumer.start() + except Exception as start_err: + logger.error( + "Egress consumer failed to start for user=%s room=%s: %s", + sanitize_for_log(user_id), + sanitize_for_log(room_code), + start_err, + ) + egress_ready.set() # Unblock ingest so ingest can still run + return logger.info( - "Egress consumer ready. Listening language=%s, partitions=%s", - listening_language, - partitions, - ) - print( - "Egress consumer ready. Listening language=%s, partitions=%s", - listening_language, - partitions, + "Egress consumer ready. group=%s listening_language=%s", + sanitize_for_log(group_id), + sanitize_for_log(listening_language), ) egress_ready.set() # Signal that we are ready to receive @@ -254,23 +259,14 @@ async def egress_task() -> None: logger.info( "Egress received: room=%s target_lang=%s" " listening_lang=%s seq=%d", - payload.room_id, - payload.target_language, - listening_language, - payload.sequence_number, - ) - print( - "Egress received: room=%s" - " target_lang=%s listening_lang=%s seq=%d", - payload.room_id, - payload.target_language, - listening_language, - payload.sequence_number, + sanitize_for_log(payload.room_id), + sanitize_for_log(payload.target_language), + sanitize_for_log(listening_language), + sanitize_for_log(payload.sequence_number), ) # Filter by Room if payload.room_id != room_code: - print(f"Egress: skipping wrong room {payload.room_id}") continue # Language filter: In production with multiple participants, @@ -284,10 +280,10 @@ async def egress_task() -> None: len(participants) > 1 and payload.target_language != listening_language ): - print( - "Egress: skipping lang mismatch" - f" target={payload.target_language} " - f"!= listening={listening_language}" + logger.debug( + "Egress: skipping lang mismatch target=%s != listening=%s", + sanitize_for_log(payload.target_language), + sanitize_for_log(listening_language), ) continue @@ -296,53 +292,35 @@ async def egress_task() -> None: current_highest = highest_seq.get(speaker_key, -1) if payload.sequence_number < current_highest - 10: - logger.debug("Dropped stale audio frame from %s", speaker_key) + logger.debug( + "Dropped stale audio frame from %s", + sanitize_for_log(speaker_key), + ) continue highest_seq[speaker_key] = max( current_highest, payload.sequence_number ) - # Send to client (binary) + # Send synthesized audio to the listener's WebSocket audio_bytes = base64.b64decode(payload.audio_data) - print(f"Egress: about to send {len(audio_bytes)} bytes to client") - - # Also save to disk for testing/validation - output_path = Path(rf"{settings.SYSTEM_PATH}\voiceai_output.raw") - mode = "ab" if payload.sequence_number > 0 else "wb" - - def _write_audio( - _path: Path = output_path, - _mode: str = mode, - _data: bytes = audio_bytes, - ) -> None: - with _path.open(_mode) as f: - f.write(_data) - - await asyncio.to_thread(_write_audio) - print( - f"Egress: SAVED {len(audio_bytes)} bytes to {output_path} " - f"(seq={payload.sequence_number})" - ) - try: await websocket.send_bytes(audio_bytes) - print( - "Egress: SUCCESSFULLY sent" - f" {len(audio_bytes)} bytes" - " via WebSocket" + logger.info( + "Egress: sent %d bytes to user=%s (seq=%d)", + len(audio_bytes), + sanitize_for_log(user_id), + sanitize_for_log(payload.sequence_number), ) except Exception as send_err: - print( - "Egress: WebSocket send failed" - f" (but file was saved): {send_err}" + logger.warning( + "Egress: WebSocket send failed for user=%s: %s", + sanitize_for_log(user_id), + send_err, ) except Exception as frame_err: - print(f"Error processing egress frame: {frame_err}") - import traceback - - traceback.print_exc() + logger.exception("Error processing egress frame: %s", frame_err) finally: await consumer.stop()