-
Notifications
You must be signed in to change notification settings - Fork 0
fix(meeting): resolve audio pipeline failures and stabilize websocket… #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,13 +9,12 @@ | |
| import json | ||
| import logging | ||
| import time | ||
| from pathlib import Path | ||
|
|
||
| from aiokafka import AIOKafkaConsumer # type: ignore[import-untyped] | ||
| from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect | ||
|
|
||
| from app.core.config import settings | ||
| from app.core.sanitize import log_sanitizer | ||
| from app.core.sanitize import log_sanitizer, sanitize_for_log | ||
| from app.kafka.topics import AUDIO_SYNTHESIZED, TEXT_ORIGINAL, TEXT_TRANSLATED | ||
| from app.modules.meeting.state import MeetingStateService | ||
| from app.modules.meeting.ws_dependencies import assert_room_participant, authenticate_ws | ||
|
|
@@ -144,7 +143,7 @@ async def audio_websocket( # noqa: C901 | |
|
|
||
| listening_language = participant_state.get("language", "en") | ||
| await websocket.accept() | ||
| print("Audio WS client connected: %s", user_id) | ||
| logger.info("Audio WS client connected: %s", sanitize_for_log(user_id)) | ||
|
|
||
| ingest_svc = get_audio_ingest_service() | ||
| ingest_svc.reset_sequence(f"{room_code}:{user_id}") | ||
|
|
@@ -185,12 +184,17 @@ async def ingest_task() -> None: | |
| room_id=room_code, | ||
| user_id=user_id, | ||
| audio_bytes=chunk, | ||
| source_language=participant_state.get("language", "en"), | ||
| # Use speaking_language (what the user speaks) — not | ||
| # listening_language (what they want to hear) so STT | ||
| # transcribes in the correct language. | ||
| source_language=participant_state.get( | ||
| "speaking_language", "en" | ||
| ), | ||
| ) | ||
| except WebSocketDisconnect: | ||
| logger.info( | ||
| "Audio WS client disconnected (WebSocketDisconnect): %s", | ||
| log_sanitizer.sanitize(user_id), | ||
| sanitize_for_log(user_id), | ||
| ) | ||
| except RuntimeError as exc: | ||
| # Starlette raises RuntimeError once the disconnect frame has been | ||
|
|
@@ -202,7 +206,7 @@ async def ingest_task() -> None: | |
| raise | ||
| logger.info( | ||
| "Audio WS ingest RuntimeError (socket already closed) for %s: %s", | ||
| log_sanitizer.sanitize(user_id), | ||
| sanitize_for_log(user_id), | ||
| exc, | ||
| ) | ||
|
|
||
|
|
@@ -211,34 +215,35 @@ async def ingest_task() -> None: | |
|
|
||
| async def egress_task() -> None: | ||
| """Reads Kafka synthesized audio, filters for user, writes to WS.""" | ||
| # Use a unique group_id per connection so Kafka assigns all partitions | ||
| # to this consumer and auto_offset_reset="latest" starts from the tail. | ||
| # A group-less consumer in AIOKafka requires explicit partition.assign() | ||
| # which is unreliable on a freshly-joined broker; a unique group is simpler. | ||
| group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}" | ||
| consumer = AIOKafkaConsumer( | ||
| AUDIO_SYNTHESIZED, | ||
| bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, | ||
| # No group_id → simple assign mode, avoids rebalance delays | ||
| group_id=group_id, | ||
| auto_offset_reset="latest", | ||
| value_deserializer=lambda v: json.loads(v.decode("utf-8")), | ||
| enable_auto_commit=False, | ||
| enable_auto_commit=True, | ||
| ) | ||
| await consumer.start() | ||
|
|
||
| # Force partition assignment by seeking to end | ||
| partitions = consumer.assignment() | ||
| if not partitions: | ||
| # Wait briefly for automatic assignment | ||
| await asyncio.sleep(1) | ||
| partitions = consumer.assignment() | ||
| for tp in partitions: | ||
| await consumer.seek_to_end(tp) | ||
| try: | ||
| await consumer.start() | ||
| except Exception as start_err: | ||
| logger.error( | ||
| "Egress consumer failed to start for user=%s room=%s: %s", | ||
| sanitize_for_log(user_id), | ||
| sanitize_for_log(room_code), | ||
| start_err, | ||
| ) | ||
| egress_ready.set() # Unblock ingest so ingest can still run | ||
| return | ||
|
Comment on lines
+233
to
+241
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This fallback path still kills ingest. After 🤖 Prompt for AI Agents |
||
|
|
||
| logger.info( | ||
| "Egress consumer ready. Listening language=%s, partitions=%s", | ||
| listening_language, | ||
| partitions, | ||
| ) | ||
| print( | ||
| "Egress consumer ready. Listening language=%s, partitions=%s", | ||
| listening_language, | ||
| partitions, | ||
| "Egress consumer ready. group=%s listening_language=%s", | ||
| sanitize_for_log(group_id), | ||
| sanitize_for_log(listening_language), | ||
| ) | ||
| egress_ready.set() # Signal that we are ready to receive | ||
|
|
||
|
|
@@ -254,23 +259,14 @@ async def egress_task() -> None: | |
| logger.info( | ||
| "Egress received: room=%s target_lang=%s" | ||
| " listening_lang=%s seq=%d", | ||
| payload.room_id, | ||
| payload.target_language, | ||
| listening_language, | ||
| payload.sequence_number, | ||
| ) | ||
| print( | ||
| "Egress received: room=%s" | ||
| " target_lang=%s listening_lang=%s seq=%d", | ||
| payload.room_id, | ||
| payload.target_language, | ||
| listening_language, | ||
| payload.sequence_number, | ||
| sanitize_for_log(payload.room_id), | ||
| sanitize_for_log(payload.target_language), | ||
| sanitize_for_log(listening_language), | ||
| sanitize_for_log(payload.sequence_number), | ||
| ) | ||
|
aniebietafia marked this conversation as resolved.
|
||
|
|
||
| # Filter by Room | ||
| if payload.room_id != room_code: | ||
| print(f"Egress: skipping wrong room {payload.room_id}") | ||
| continue | ||
|
|
||
| # Language filter: In production with multiple participants, | ||
|
|
@@ -284,10 +280,10 @@ async def egress_task() -> None: | |
| len(participants) > 1 | ||
| and payload.target_language != listening_language | ||
| ): | ||
| print( | ||
| "Egress: skipping lang mismatch" | ||
| f" target={payload.target_language} " | ||
| f"!= listening={listening_language}" | ||
| logger.debug( | ||
| "Egress: skipping lang mismatch target=%s != listening=%s", | ||
| sanitize_for_log(payload.target_language), | ||
| sanitize_for_log(listening_language), | ||
| ) | ||
| continue | ||
|
|
||
|
|
@@ -296,53 +292,35 @@ async def egress_task() -> None: | |
| current_highest = highest_seq.get(speaker_key, -1) | ||
|
|
||
| if payload.sequence_number < current_highest - 10: | ||
| logger.debug("Dropped stale audio frame from %s", speaker_key) | ||
| logger.debug( | ||
| "Dropped stale audio frame from %s", | ||
| sanitize_for_log(speaker_key), | ||
| ) | ||
| continue | ||
|
|
||
| highest_seq[speaker_key] = max( | ||
| current_highest, payload.sequence_number | ||
| ) | ||
|
|
||
| # Send to client (binary) | ||
| # Send synthesized audio to the listener's WebSocket | ||
| audio_bytes = base64.b64decode(payload.audio_data) | ||
| print(f"Egress: about to send {len(audio_bytes)} bytes to client") | ||
|
|
||
| # Also save to disk for testing/validation | ||
| output_path = Path(rf"{settings.SYSTEM_PATH}\voiceai_output.raw") | ||
| mode = "ab" if payload.sequence_number > 0 else "wb" | ||
|
|
||
| def _write_audio( | ||
| _path: Path = output_path, | ||
| _mode: str = mode, | ||
| _data: bytes = audio_bytes, | ||
| ) -> None: | ||
| with _path.open(_mode) as f: | ||
| f.write(_data) | ||
|
|
||
| await asyncio.to_thread(_write_audio) | ||
| print( | ||
| f"Egress: SAVED {len(audio_bytes)} bytes to {output_path} " | ||
| f"(seq={payload.sequence_number})" | ||
| ) | ||
|
|
||
| try: | ||
| await websocket.send_bytes(audio_bytes) | ||
| print( | ||
| "Egress: SUCCESSFULLY sent" | ||
| f" {len(audio_bytes)} bytes" | ||
| " via WebSocket" | ||
| logger.info( | ||
| "Egress: sent %d bytes to user=%s (seq=%d)", | ||
| len(audio_bytes), | ||
| sanitize_for_log(user_id), | ||
| sanitize_for_log(payload.sequence_number), | ||
| ) | ||
| except Exception as send_err: | ||
| print( | ||
| "Egress: WebSocket send failed" | ||
| f" (but file was saved): {send_err}" | ||
| logger.warning( | ||
| "Egress: WebSocket send failed for user=%s: %s", | ||
| sanitize_for_log(user_id), | ||
| send_err, | ||
| ) | ||
|
|
||
| except Exception as frame_err: | ||
| print(f"Error processing egress frame: {frame_err}") | ||
| import traceback | ||
|
|
||
| traceback.print_exc() | ||
| logger.exception("Error processing egress frame: %s", frame_err) | ||
|
|
||
| finally: | ||
| await consumer.stop() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
In Kafka / aiokafka, if two consumers connect with the same group_id, do they share partitions via rebalancing instead of each receiving every message?💡 Result:
Yes, when two or more consumers connect with the same group_id, they share the partitions of the subscribed topics via rebalancing, meaning each message is delivered to only one consumer in the group [1][2][3]. In Kafka and aiokafka, the consumer group mechanism is designed to distribute the workload of consuming a topic across multiple processes [2][4]. The group coordinator automatically manages this by assigning each partition to exactly one consumer within the group at any given time [1][3][4]. Key points regarding this behavior: - Partition Sharing: If you have a topic with multiple partitions and multiple consumers in the same group, the partitions are divided among those consumers [3][5]. Each consumer processes a unique subset of the partitions, ensuring that no two consumers in the same group process the same message [4][5]. - Rebalancing: If a new consumer joins the group or an existing one leaves (or fails), the group coordinator triggers a rebalance [2][6]. During this process, partitions are reassigned among the active members to maintain an even distribution of work [1][6]. - Scaling: This mechanism allows you to scale your application by adding more consumers to the group, up to the total number of partitions in the topic [4]. If the number of consumers exceeds the number of partitions, the extra consumers will remain idle [4]. - Independent Groups: If you need multiple consumers to receive every message (e.g., for different microservices or independent processing logic), you must assign each consumer or service a unique group_id [5][7]. Consumers in different groups operate independently and will each receive the full stream of messages from the subscribed topics [5].
Citations:
🏁 Script executed:
Repository: Brints/spoken-api
Length of output: 93
🏁 Script executed:
Repository: Brints/spoken-api
Length of output: 1421
🏁 Script executed:
Repository: Brints/spoken-api
Length of output: 987
🏁 Script executed:
Repository: Brints/spoken-api
Length of output: 289
Make the egress
group_idactually unique per socket.int(time.time())only changes once per second. Two audio sockets for the sameroom_code/user_idopened within that second will join the same consumer group, trigger a rebalance, and can miss synthesized audio instead of each getting a full copy.Suggested fix
📝 Committable suggestion
🤖 Prompt for AI Agents