Skip to content

fix(meeting): resolve audio pipeline failures and stabilize websocket…#71

Merged
aniebietafia merged 1 commit into
mainfrom
fix/audio_transcribe_pipeline
May 15, 2026
Merged

fix(meeting): resolve audio pipeline failures and stabilize websocket…#71
aniebietafia merged 1 commit into
mainfrom
fix/audio_transcribe_pipeline

Conversation

@aniebietafia
Copy link
Copy Markdown
Contributor

@aniebietafia aniebietafia commented May 15, 2026

… egress

  • Fix 1006 WebSocket disconnect loop by removing Windows-style debug file writes that crashed on Linux/Docker.
  • Stabilize Kafka audio egress by using unique consumer groups per connection.
  • Fix STT language mismatch by correctly using speaking_language instead of the listening language for transcription.
  • Update lobby and admission flow to persist speaking_language so guests are correctly transcribed after joining.
  • Enhance security by wrapping PII (User/Room IDs) in logs with sanitize_for_log.
  • Clean up redundant imports and legacy debug code in ws_router.py.

Summary by CodeRabbit

  • New Features

    • Added speaking language tracking for meeting participants, enabling the system to properly distinguish between listening and speaking language preferences.
  • Improvements

    • Enhanced audio streaming reliability with improved error handling for connection issues.
    • Standardized system logging for better diagnostics and troubleshooting.

Review Change Stack

… egress

- Fix 1006 WebSocket disconnect loop by removing Windows-style debug file writes that crashed on Linux/Docker.
- Stabilize Kafka audio egress by using unique consumer groups per connection.
- Fix STT language mismatch by correctly using `speaking_language` instead of the listening language for transcription.
- Update lobby and admission flow to persist `speaking_language` so guests are correctly transcribed after joining.
- Enhance security by wrapping PII (User/Room IDs) in logs with `sanitize_for_log`.
- Clean up redundant imports and legacy debug code in `ws_router.py`.

Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 15, 2026

📝 Walkthrough

Walkthrough

The PR extends the lobby admission flow to track and propagate speaking_language through service and state layers, then refactors WebSocket audio ingest to use speaking language as the STT source. Audio egress is substantially reworked to use group-based Kafka consumer management instead of manual offset logic, with all diagnostics converted to structured logging via sanitized logger calls.

Changes

Audio Flow and Language Tracking

Layer / File(s) Summary
Speaking language tracking through lobby admission
app/modules/meeting/service.py, app/modules/meeting/state.py
Service _check_lobby_required now derives speaking_language with explicit/user-default/"en" priority and forwards it to state. State add_to_lobby stores speaking_language in lobby records, and admit_from_lobby carries it forward into participants. Cleanup logging is updated to use sanitize_for_log.
Audio ingest source language and logging
app/modules/meeting/ws_router.py
Ingest WebSocket handler switches STT source_language from participant language field to speaking_language (defaulting to "en"), adds sanitized logging for connection/disconnection events, and updates error-path logging.
Audio egress consumer refactor and logging
app/modules/meeting/ws_router.py
Kafka consumer is reworked to use unique per-connection group-based offsets (with auto_offset_reset="latest" and enable_auto_commit=True) instead of manual partition seeking. Startup failure handling is added to unblock ingest if egress fails. All diagnostics are converted from prints/disk persistence to structured logger.debug/info/warning/exception calls with sanitized field values for room filtering, language mismatch detection, stale-frame dropping, and WebSocket send operations.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • Brints/spoken-api#66: Both PRs modify the lobby→room admission path in the meeting module—main PR extends add_to_lobby/admit_from_lobby to carry speaking_language, while PR #66 changes admit_from_lobby/admit_user to set presence fields (e.g., display_name/role) and emit join notifications.

Suggested labels

backend, size/S

Poem

🐰 A rabbit hops through speaking tongues,
From lobby's wait to audio's song,
Each language tracked from start to end,
While Kafka offsets now blend,
And logs run clean—no prints remain! 🎵

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'fix(meeting): resolve audio pipeline failures and stabilize websocket' directly aligns with the PR's main objectives to fix audio pipeline issues and stabilize WebSocket egress.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/audio_transcribe_pipeline

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@app/modules/meeting/ws_router.py`:
- Around line 259-266: The per-frame logger.info calls in the egress hot path
(the logger.info call that formats sanitize_for_log(payload.room_id),
sanitize_for_log(payload.target_language), sanitize_for_log(listening_language),
sanitize_for_log(payload.sequence_number)) should be removed from the hot path
to avoid log flooding and formatting overhead; either delete them or lower them
to logger.debug and guard with logger.isEnabledFor(logging.DEBUG) so you only
call sanitize_for_log when the log will actually emit; apply the same change to
the other occurrence around the second logger.info (the block referenced as also
applies to 309-314).
- Around line 233-241: The except block that logs the egress start failure must
not complete task2 because asyncio.wait(..., return_when=FIRST_COMPLETED) will
cancel the still-pending ingest task; after setting egress_ready you should not
return—keep the egress task pending so asyncio.wait does not treat it as
finished. Replace the final `return` in the egress-start except handler with
logic that leaves the coroutine pending (for example by awaiting a
never-completing Future or otherwise blocking until canceled) so task2 remains
alive; this change should be made in the except block where egress_ready is set
(symbols: egress_ready, task2, ingest task, asyncio.wait(...,
return_when=FIRST_COMPLETED)).
- Around line 218-223: The group_id constructed with int(time.time()) is not
unique enough; replace the timestamp suffix with a true unique identifier so
each socket gets its own consumer group (avoid rebalance/missed messages).
Update the group_id assignment used before AIOKafkaConsumer (the variable named
group_id) to append a UUID or the socket's unique identifier (e.g.,
uuid.uuid4().hex or the connection's unique id) and ensure you import/obtain
that identifier where group_id is built so every connection yields a distinct
group name.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9f1e1c0c-f950-4351-a675-45ecf2642e3b

📥 Commits

Reviewing files that changed from the base of the PR and between 12eb607 and 6882322.

📒 Files selected for processing (3)
  • app/modules/meeting/service.py
  • app/modules/meeting/state.py
  • app/modules/meeting/ws_router.py

Comment on lines +218 to 223
# Use a unique group_id per connection so Kafka assigns all partitions
# to this consumer and auto_offset_reset="latest" starts from the tail.
# A group-less consumer in AIOKafka requires explicit partition.assign()
# which is unreliable on a freshly-joined broker; a unique group is simpler.
group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}"
consumer = AIOKafkaConsumer(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In Kafka / aiokafka, if two consumers connect with the same group_id, do they share partitions via rebalancing instead of each receiving every message?

💡 Result:

Yes, when two or more consumers connect with the same group_id, they share the partitions of the subscribed topics via rebalancing, meaning each message is delivered to only one consumer in the group [1][2][3]. In Kafka and aiokafka, the consumer group mechanism is designed to distribute the workload of consuming a topic across multiple processes [2][4]. The group coordinator automatically manages this by assigning each partition to exactly one consumer within the group at any given time [1][3][4]. Key points regarding this behavior: - Partition Sharing: If you have a topic with multiple partitions and multiple consumers in the same group, the partitions are divided among those consumers [3][5]. Each consumer processes a unique subset of the partitions, ensuring that no two consumers in the same group process the same message [4][5]. - Rebalancing: If a new consumer joins the group or an existing one leaves (or fails), the group coordinator triggers a rebalance [2][6]. During this process, partitions are reassigned among the active members to maintain an even distribution of work [1][6]. - Scaling: This mechanism allows you to scale your application by adding more consumers to the group, up to the total number of partitions in the topic [4]. If the number of consumers exceeds the number of partitions, the extra consumers will remain idle [4]. - Independent Groups: If you need multiple consumers to receive every message (e.g., for different microservices or independent processing logic), you must assign each consumer or service a unique group_id [5][7]. Consumers in different groups operate independently and will each receive the full stream of messages from the subscribed topics [5].

Citations:


🏁 Script executed:

# Find Python version requirement
fd -t f "(setup.py|pyproject.toml|requirements.txt|.python-version)" | head -20

Repository: Brints/spoken-api

Length of output: 93


🏁 Script executed:

# Examine the actual code context in ws_router.py around lines 218-223
cat -n app/modules/meeting/ws_router.py | sed -n '210,235p'

Repository: Brints/spoken-api

Length of output: 1421


🏁 Script executed:

# Check if uuid or other unique ID generation is already imported
rg -A 5 -B 5 "^import|^from" app/modules/meeting/ws_router.py | head -50

Repository: Brints/spoken-api

Length of output: 987


🏁 Script executed:

# Check Python version requirement
cat pyproject.toml | grep -A 5 -B 5 "python"

Repository: Brints/spoken-api

Length of output: 289


Make the egress group_id actually unique per socket.

int(time.time()) only changes once per second. Two audio sockets for the same room_code/user_id opened within that second will join the same consumer group, trigger a rebalance, and can miss synthesized audio instead of each getting a full copy.

Suggested fix
-        group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}"
+        group_id = f"audio-egress-{room_code}-{user_id}-{time.time_ns()}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Use a unique group_id per connection so Kafka assigns all partitions
# to this consumer and auto_offset_reset="latest" starts from the tail.
# A group-less consumer in AIOKafka requires explicit partition.assign()
# which is unreliable on a freshly-joined broker; a unique group is simpler.
group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}"
consumer = AIOKafkaConsumer(
# Use a unique group_id per connection so Kafka assigns all partitions
# to this consumer and auto_offset_reset="latest" starts from the tail.
# A group-less consumer in AIOKafka requires explicit partition.assign()
# which is unreliable on a freshly-joined broker; a unique group is simpler.
group_id = f"audio-egress-{room_code}-{user_id}-{time.time_ns()}"
consumer = AIOKafkaConsumer(
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/modules/meeting/ws_router.py` around lines 218 - 223, The group_id
constructed with int(time.time()) is not unique enough; replace the timestamp
suffix with a true unique identifier so each socket gets its own consumer group
(avoid rebalance/missed messages). Update the group_id assignment used before
AIOKafkaConsumer (the variable named group_id) to append a UUID or the socket's
unique identifier (e.g., uuid.uuid4().hex or the connection's unique id) and
ensure you import/obtain that identifier where group_id is built so every
connection yields a distinct group name.

Comment on lines +233 to +241
except Exception as start_err:
logger.error(
"Egress consumer failed to start for user=%s room=%s: %s",
sanitize_for_log(user_id),
sanitize_for_log(room_code),
start_err,
)
egress_ready.set() # Unblock ingest so ingest can still run
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

This fallback path still kills ingest.

After egress_ready.set(), this return completes task2, and the asyncio.wait(..., return_when=FIRST_COMPLETED) below immediately cancels the still-pending ingest task. So the “ingest can still run” comment is not true in the current control flow.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/modules/meeting/ws_router.py` around lines 233 - 241, The except block
that logs the egress start failure must not complete task2 because
asyncio.wait(..., return_when=FIRST_COMPLETED) will cancel the still-pending
ingest task; after setting egress_ready you should not return—keep the egress
task pending so asyncio.wait does not treat it as finished. Replace the final
`return` in the egress-start except handler with logic that leaves the coroutine
pending (for example by awaiting a never-completing Future or otherwise blocking
until canceled) so task2 remains alive; this change should be made in the except
block where egress_ready is set (symbols: egress_ready, task2, ingest task,
asyncio.wait(..., return_when=FIRST_COMPLETED)).

Comment thread app/modules/meeting/ws_router.py
@aniebietafia aniebietafia merged commit 404fce2 into main May 15, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant