Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion app/modules/meeting/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ async def _check_lobby_required(
tracking_id: str,
display_name: str,
listening_language: str | None,
speaking_language: str | None,
new_guest_token: str | None,
live_pts: dict,
) -> dict | None:
Expand Down Expand Up @@ -319,7 +320,20 @@ async def _check_lobby_required(
else:
final_lang = "en"

await self.state.add_to_lobby(room_code, tracking_id, display_name, final_lang)
if speaking_language:
final_speak_lang = speaking_language
elif user and user.speaking_language:
final_speak_lang = user.speaking_language
else:
final_speak_lang = "en"

await self.state.add_to_lobby(
room_code,
tracking_id,
display_name,
final_lang,
speaking_language=final_speak_lang,
)

cm = get_connection_manager()
await cm.broadcast_to_room(
Expand Down Expand Up @@ -445,6 +459,7 @@ async def join_room(
tracking_id=tracking_id,
display_name=display_name,
listening_language=listening_language,
speaking_language=speaking_language,
new_guest_token=new_guest_token,
live_pts=live_pts,
)
Expand Down
15 changes: 13 additions & 2 deletions app/modules/meeting/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import redis.asyncio as aioredis

from app.core.sanitize import sanitize_for_log
from app.modules.auth.token_store import _get_redis_client
from app.modules.meeting.constants import (
key_room_active_speaker,
Expand Down Expand Up @@ -106,12 +107,18 @@ async def get_participants(self, room_code: str) -> dict[str, dict]:
# ── Lobby Set ────────────────────────────────────────────────────────

async def add_to_lobby(
self, room_code: str, user_id: str, display_name: str, language: str
self,
room_code: str,
user_id: str,
display_name: str,
language: str,
speaking_language: str = "en",
) -> None:
"""Place a user in the waiting room/lobby hash."""
state = {
"display_name": display_name,
"language": language,
"speaking_language": speaking_language,
}
await cast(
"Awaitable[Any]",
Expand Down Expand Up @@ -144,6 +151,7 @@ async def admit_from_lobby(self, room_code: str, user_id: str) -> bool:

lobby_state = json.loads(lobby_data_raw)
language = lobby_state.get("language", "en")
speaking_language = lobby_state.get("speaking_language", "en")
display_name = lobby_state.get("display_name", "")

# A lightweight transaction (pipeline) to ensure we don't have partial state
Expand All @@ -153,6 +161,7 @@ async def admit_from_lobby(self, room_code: str, user_id: str) -> bool:
state = {
"status": "connected",
"language": language,
"speaking_language": speaking_language,
"hardware_ready": True,
"display_name": display_name,
"role": "guest",
Expand Down Expand Up @@ -198,4 +207,6 @@ async def cleanup_room(self, room_code: str) -> None:
)
if keys:
await self._redis.delete(*keys)
logger.info("Cleaned up Redis state for room %s", room_code)
logger.info(
"Cleaned up Redis state for room %s", sanitize_for_log(room_code)
)
130 changes: 54 additions & 76 deletions app/modules/meeting/ws_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
import json
import logging
import time
from pathlib import Path

from aiokafka import AIOKafkaConsumer # type: ignore[import-untyped]
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect

from app.core.config import settings
from app.core.sanitize import log_sanitizer
from app.core.sanitize import log_sanitizer, sanitize_for_log
from app.kafka.topics import AUDIO_SYNTHESIZED, TEXT_ORIGINAL, TEXT_TRANSLATED
from app.modules.meeting.state import MeetingStateService
from app.modules.meeting.ws_dependencies import assert_room_participant, authenticate_ws
Expand Down Expand Up @@ -144,7 +143,7 @@ async def audio_websocket( # noqa: C901

listening_language = participant_state.get("language", "en")
await websocket.accept()
print("Audio WS client connected: %s", user_id)
logger.info("Audio WS client connected: %s", sanitize_for_log(user_id))

ingest_svc = get_audio_ingest_service()
ingest_svc.reset_sequence(f"{room_code}:{user_id}")
Expand Down Expand Up @@ -185,12 +184,17 @@ async def ingest_task() -> None:
room_id=room_code,
user_id=user_id,
audio_bytes=chunk,
source_language=participant_state.get("language", "en"),
# Use speaking_language (what the user speaks) — not
# listening_language (what they want to hear) so STT
# transcribes in the correct language.
source_language=participant_state.get(
"speaking_language", "en"
),
)
except WebSocketDisconnect:
logger.info(
"Audio WS client disconnected (WebSocketDisconnect): %s",
log_sanitizer.sanitize(user_id),
sanitize_for_log(user_id),
)
except RuntimeError as exc:
# Starlette raises RuntimeError once the disconnect frame has been
Expand All @@ -202,7 +206,7 @@ async def ingest_task() -> None:
raise
logger.info(
"Audio WS ingest RuntimeError (socket already closed) for %s: %s",
log_sanitizer.sanitize(user_id),
sanitize_for_log(user_id),
exc,
)

Expand All @@ -211,34 +215,35 @@ async def ingest_task() -> None:

async def egress_task() -> None:
"""Reads Kafka synthesized audio, filters for user, writes to WS."""
# Use a unique group_id per connection so Kafka assigns all partitions
# to this consumer and auto_offset_reset="latest" starts from the tail.
# A group-less consumer in AIOKafka requires explicit partition.assign()
# which is unreliable on a freshly-joined broker; a unique group is simpler.
group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}"
consumer = AIOKafkaConsumer(
Comment on lines +218 to 223
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In Kafka / aiokafka, if two consumers connect with the same group_id, do they share partitions via rebalancing instead of each receiving every message?

💡 Result:

Yes, when two or more consumers connect with the same group_id, they share the partitions of the subscribed topics via rebalancing, meaning each message is delivered to only one consumer in the group [1][2][3]. In Kafka and aiokafka, the consumer group mechanism is designed to distribute the workload of consuming a topic across multiple processes [2][4]. The group coordinator automatically manages this by assigning each partition to exactly one consumer within the group at any given time [1][3][4]. Key points regarding this behavior: - Partition Sharing: If you have a topic with multiple partitions and multiple consumers in the same group, the partitions are divided among those consumers [3][5]. Each consumer processes a unique subset of the partitions, ensuring that no two consumers in the same group process the same message [4][5]. - Rebalancing: If a new consumer joins the group or an existing one leaves (or fails), the group coordinator triggers a rebalance [2][6]. During this process, partitions are reassigned among the active members to maintain an even distribution of work [1][6]. - Scaling: This mechanism allows you to scale your application by adding more consumers to the group, up to the total number of partitions in the topic [4]. If the number of consumers exceeds the number of partitions, the extra consumers will remain idle [4]. - Independent Groups: If you need multiple consumers to receive every message (e.g., for different microservices or independent processing logic), you must assign each consumer or service a unique group_id [5][7]. Consumers in different groups operate independently and will each receive the full stream of messages from the subscribed topics [5].

Citations:


🏁 Script executed:

# Find Python version requirement
fd -t f "(setup.py|pyproject.toml|requirements.txt|.python-version)" | head -20

Repository: Brints/spoken-api

Length of output: 93


🏁 Script executed:

# Examine the actual code context in ws_router.py around lines 218-223
cat -n app/modules/meeting/ws_router.py | sed -n '210,235p'

Repository: Brints/spoken-api

Length of output: 1421


🏁 Script executed:

# Check if uuid or other unique ID generation is already imported
rg -A 5 -B 5 "^import|^from" app/modules/meeting/ws_router.py | head -50

Repository: Brints/spoken-api

Length of output: 987


🏁 Script executed:

# Check Python version requirement
cat pyproject.toml | grep -A 5 -B 5 "python"

Repository: Brints/spoken-api

Length of output: 289


Make the egress group_id actually unique per socket.

int(time.time()) only changes once per second. Two audio sockets for the same room_code/user_id opened within that second will join the same consumer group, trigger a rebalance, and can miss synthesized audio instead of each getting a full copy.

Suggested fix
-        group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}"
+        group_id = f"audio-egress-{room_code}-{user_id}-{time.time_ns()}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Use a unique group_id per connection so Kafka assigns all partitions
# to this consumer and auto_offset_reset="latest" starts from the tail.
# A group-less consumer in AIOKafka requires explicit partition.assign()
# which is unreliable on a freshly-joined broker; a unique group is simpler.
group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}"
consumer = AIOKafkaConsumer(
# Use a unique group_id per connection so Kafka assigns all partitions
# to this consumer and auto_offset_reset="latest" starts from the tail.
# A group-less consumer in AIOKafka requires explicit partition.assign()
# which is unreliable on a freshly-joined broker; a unique group is simpler.
group_id = f"audio-egress-{room_code}-{user_id}-{time.time_ns()}"
consumer = AIOKafkaConsumer(
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/modules/meeting/ws_router.py` around lines 218 - 223, The group_id
constructed with int(time.time()) is not unique enough; replace the timestamp
suffix with a true unique identifier so each socket gets its own consumer group
(avoid rebalance/missed messages). Update the group_id assignment used before
AIOKafkaConsumer (the variable named group_id) to append a UUID or the socket's
unique identifier (e.g., uuid.uuid4().hex or the connection's unique id) and
ensure you import/obtain that identifier where group_id is built so every
connection yields a distinct group name.

AUDIO_SYNTHESIZED,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
# No group_id → simple assign mode, avoids rebalance delays
group_id=group_id,
auto_offset_reset="latest",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
enable_auto_commit=False,
enable_auto_commit=True,
)
await consumer.start()

# Force partition assignment by seeking to end
partitions = consumer.assignment()
if not partitions:
# Wait briefly for automatic assignment
await asyncio.sleep(1)
partitions = consumer.assignment()
for tp in partitions:
await consumer.seek_to_end(tp)
try:
await consumer.start()
except Exception as start_err:
logger.error(
"Egress consumer failed to start for user=%s room=%s: %s",
sanitize_for_log(user_id),
sanitize_for_log(room_code),
start_err,
)
egress_ready.set() # Unblock ingest so ingest can still run
return
Comment on lines +233 to +241
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

This fallback path still kills ingest.

After egress_ready.set(), this return completes task2, and the asyncio.wait(..., return_when=FIRST_COMPLETED) below immediately cancels the still-pending ingest task. So the “ingest can still run” comment is not true in the current control flow.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/modules/meeting/ws_router.py` around lines 233 - 241, The except block
that logs the egress start failure must not complete task2 because
asyncio.wait(..., return_when=FIRST_COMPLETED) will cancel the still-pending
ingest task; after setting egress_ready you should not return—keep the egress
task pending so asyncio.wait does not treat it as finished. Replace the final
`return` in the egress-start except handler with logic that leaves the coroutine
pending (for example by awaiting a never-completing Future or otherwise blocking
until canceled) so task2 remains alive; this change should be made in the except
block where egress_ready is set (symbols: egress_ready, task2, ingest task,
asyncio.wait(..., return_when=FIRST_COMPLETED)).


logger.info(
"Egress consumer ready. Listening language=%s, partitions=%s",
listening_language,
partitions,
)
print(
"Egress consumer ready. Listening language=%s, partitions=%s",
listening_language,
partitions,
"Egress consumer ready. group=%s listening_language=%s",
sanitize_for_log(group_id),
sanitize_for_log(listening_language),
)
egress_ready.set() # Signal that we are ready to receive

Expand All @@ -254,23 +259,14 @@ async def egress_task() -> None:
logger.info(
"Egress received: room=%s target_lang=%s"
" listening_lang=%s seq=%d",
payload.room_id,
payload.target_language,
listening_language,
payload.sequence_number,
)
print(
"Egress received: room=%s"
" target_lang=%s listening_lang=%s seq=%d",
payload.room_id,
payload.target_language,
listening_language,
payload.sequence_number,
sanitize_for_log(payload.room_id),
sanitize_for_log(payload.target_language),
sanitize_for_log(listening_language),
sanitize_for_log(payload.sequence_number),
)
Comment thread
aniebietafia marked this conversation as resolved.

# Filter by Room
if payload.room_id != room_code:
print(f"Egress: skipping wrong room {payload.room_id}")
continue

# Language filter: In production with multiple participants,
Expand All @@ -284,10 +280,10 @@ async def egress_task() -> None:
len(participants) > 1
and payload.target_language != listening_language
):
print(
"Egress: skipping lang mismatch"
f" target={payload.target_language} "
f"!= listening={listening_language}"
logger.debug(
"Egress: skipping lang mismatch target=%s != listening=%s",
sanitize_for_log(payload.target_language),
sanitize_for_log(listening_language),
)
continue

Expand All @@ -296,53 +292,35 @@ async def egress_task() -> None:
current_highest = highest_seq.get(speaker_key, -1)

if payload.sequence_number < current_highest - 10:
logger.debug("Dropped stale audio frame from %s", speaker_key)
logger.debug(
"Dropped stale audio frame from %s",
sanitize_for_log(speaker_key),
)
continue

highest_seq[speaker_key] = max(
current_highest, payload.sequence_number
)

# Send to client (binary)
# Send synthesized audio to the listener's WebSocket
audio_bytes = base64.b64decode(payload.audio_data)
print(f"Egress: about to send {len(audio_bytes)} bytes to client")

# Also save to disk for testing/validation
output_path = Path(rf"{settings.SYSTEM_PATH}\voiceai_output.raw")
mode = "ab" if payload.sequence_number > 0 else "wb"

def _write_audio(
_path: Path = output_path,
_mode: str = mode,
_data: bytes = audio_bytes,
) -> None:
with _path.open(_mode) as f:
f.write(_data)

await asyncio.to_thread(_write_audio)
print(
f"Egress: SAVED {len(audio_bytes)} bytes to {output_path} "
f"(seq={payload.sequence_number})"
)

try:
await websocket.send_bytes(audio_bytes)
print(
"Egress: SUCCESSFULLY sent"
f" {len(audio_bytes)} bytes"
" via WebSocket"
logger.info(
"Egress: sent %d bytes to user=%s (seq=%d)",
len(audio_bytes),
sanitize_for_log(user_id),
sanitize_for_log(payload.sequence_number),
)
except Exception as send_err:
print(
"Egress: WebSocket send failed"
f" (but file was saved): {send_err}"
logger.warning(
"Egress: WebSocket send failed for user=%s: %s",
sanitize_for_log(user_id),
send_err,
)

except Exception as frame_err:
print(f"Error processing egress frame: {frame_err}")
import traceback

traceback.print_exc()
logger.exception("Error processing egress frame: %s", frame_err)

finally:
await consumer.stop()
Expand Down
Loading