fix(meeting): resolve audio pipeline failures and stabilize websocket…#71
Conversation
… egress - Fix 1006 WebSocket disconnect loop by removing Windows-style debug file writes that crashed on Linux/Docker. - Stabilize Kafka audio egress by using unique consumer groups per connection. - Fix STT language mismatch by correctly using `speaking_language` instead of the listening language for transcription. - Update lobby and admission flow to persist `speaking_language` so guests are correctly transcribed after joining. - Enhance security by wrapping PII (User/Room IDs) in logs with `sanitize_for_log`. - Clean up redundant imports and legacy debug code in `ws_router.py`. Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
📝 WalkthroughWalkthroughThe PR extends the lobby admission flow to track and propagate ChangesAudio Flow and Language Tracking
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@app/modules/meeting/ws_router.py`:
- Around line 259-266: The per-frame logger.info calls in the egress hot path
(the logger.info call that formats sanitize_for_log(payload.room_id),
sanitize_for_log(payload.target_language), sanitize_for_log(listening_language),
sanitize_for_log(payload.sequence_number)) should be removed from the hot path
to avoid log flooding and formatting overhead; either delete them or lower them
to logger.debug and guard with logger.isEnabledFor(logging.DEBUG) so you only
call sanitize_for_log when the log will actually emit; apply the same change to
the other occurrence around the second logger.info (the block referenced as also
applies to 309-314).
- Around line 233-241: The except block that logs the egress start failure must
not complete task2 because asyncio.wait(..., return_when=FIRST_COMPLETED) will
cancel the still-pending ingest task; after setting egress_ready you should not
return—keep the egress task pending so asyncio.wait does not treat it as
finished. Replace the final `return` in the egress-start except handler with
logic that leaves the coroutine pending (for example by awaiting a
never-completing Future or otherwise blocking until canceled) so task2 remains
alive; this change should be made in the except block where egress_ready is set
(symbols: egress_ready, task2, ingest task, asyncio.wait(...,
return_when=FIRST_COMPLETED)).
- Around line 218-223: The group_id constructed with int(time.time()) is not
unique enough; replace the timestamp suffix with a true unique identifier so
each socket gets its own consumer group (avoid rebalance/missed messages).
Update the group_id assignment used before AIOKafkaConsumer (the variable named
group_id) to append a UUID or the socket's unique identifier (e.g.,
uuid.uuid4().hex or the connection's unique id) and ensure you import/obtain
that identifier where group_id is built so every connection yields a distinct
group name.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9f1e1c0c-f950-4351-a675-45ecf2642e3b
📒 Files selected for processing (3)
app/modules/meeting/service.pyapp/modules/meeting/state.pyapp/modules/meeting/ws_router.py
| # Use a unique group_id per connection so Kafka assigns all partitions | ||
| # to this consumer and auto_offset_reset="latest" starts from the tail. | ||
| # A group-less consumer in AIOKafka requires explicit partition.assign() | ||
| # which is unreliable on a freshly-joined broker; a unique group is simpler. | ||
| group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}" | ||
| consumer = AIOKafkaConsumer( |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Kafka / aiokafka, if two consumers connect with the same group_id, do they share partitions via rebalancing instead of each receiving every message?
💡 Result:
Yes, when two or more consumers connect with the same group_id, they share the partitions of the subscribed topics via rebalancing, meaning each message is delivered to only one consumer in the group [1][2][3]. In Kafka and aiokafka, the consumer group mechanism is designed to distribute the workload of consuming a topic across multiple processes [2][4]. The group coordinator automatically manages this by assigning each partition to exactly one consumer within the group at any given time [1][3][4]. Key points regarding this behavior: - Partition Sharing: If you have a topic with multiple partitions and multiple consumers in the same group, the partitions are divided among those consumers [3][5]. Each consumer processes a unique subset of the partitions, ensuring that no two consumers in the same group process the same message [4][5]. - Rebalancing: If a new consumer joins the group or an existing one leaves (or fails), the group coordinator triggers a rebalance [2][6]. During this process, partitions are reassigned among the active members to maintain an even distribution of work [1][6]. - Scaling: This mechanism allows you to scale your application by adding more consumers to the group, up to the total number of partitions in the topic [4]. If the number of consumers exceeds the number of partitions, the extra consumers will remain idle [4]. - Independent Groups: If you need multiple consumers to receive every message (e.g., for different microservices or independent processing logic), you must assign each consumer or service a unique group_id [5][7]. Consumers in different groups operate independently and will each receive the full stream of messages from the subscribed topics [5].
Citations:
- 1: https://docs.confluent.io/kafka/design/consumer-design.html
- 2: https://kafka.apache.org/38/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
- 3: https://aiokafka.readthedocs.io/en/stable/consumer.html
- 4: https://www.w3tutorials.net/blog/can-single-consumer-read-from-multiple-partitions-of-a-kafka-topic/
- 5: https://stackoverflow.com/questions/35561110/can-multiple-kafka-consumers-read-same-message-from-the-partition
- 6: https://www.confluent.io/learn/kafka-rebalancing/
- 7: https://www.redpanda.com/guides/kafka-architecture-kafka-consumer-group
🏁 Script executed:
# Find Python version requirement
fd -t f "(setup.py|pyproject.toml|requirements.txt|.python-version)" | head -20Repository: Brints/spoken-api
Length of output: 93
🏁 Script executed:
# Examine the actual code context in ws_router.py around lines 218-223
cat -n app/modules/meeting/ws_router.py | sed -n '210,235p'Repository: Brints/spoken-api
Length of output: 1421
🏁 Script executed:
# Check if uuid or other unique ID generation is already imported
rg -A 5 -B 5 "^import|^from" app/modules/meeting/ws_router.py | head -50Repository: Brints/spoken-api
Length of output: 987
🏁 Script executed:
# Check Python version requirement
cat pyproject.toml | grep -A 5 -B 5 "python"Repository: Brints/spoken-api
Length of output: 289
Make the egress group_id actually unique per socket.
int(time.time()) only changes once per second. Two audio sockets for the same room_code/user_id opened within that second will join the same consumer group, trigger a rebalance, and can miss synthesized audio instead of each getting a full copy.
Suggested fix
- group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}"
+ group_id = f"audio-egress-{room_code}-{user_id}-{time.time_ns()}"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Use a unique group_id per connection so Kafka assigns all partitions | |
| # to this consumer and auto_offset_reset="latest" starts from the tail. | |
| # A group-less consumer in AIOKafka requires explicit partition.assign() | |
| # which is unreliable on a freshly-joined broker; a unique group is simpler. | |
| group_id = f"audio-egress-{room_code}-{user_id}-{int(time.time())}" | |
| consumer = AIOKafkaConsumer( | |
| # Use a unique group_id per connection so Kafka assigns all partitions | |
| # to this consumer and auto_offset_reset="latest" starts from the tail. | |
| # A group-less consumer in AIOKafka requires explicit partition.assign() | |
| # which is unreliable on a freshly-joined broker; a unique group is simpler. | |
| group_id = f"audio-egress-{room_code}-{user_id}-{time.time_ns()}" | |
| consumer = AIOKafkaConsumer( |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/modules/meeting/ws_router.py` around lines 218 - 223, The group_id
constructed with int(time.time()) is not unique enough; replace the timestamp
suffix with a true unique identifier so each socket gets its own consumer group
(avoid rebalance/missed messages). Update the group_id assignment used before
AIOKafkaConsumer (the variable named group_id) to append a UUID or the socket's
unique identifier (e.g., uuid.uuid4().hex or the connection's unique id) and
ensure you import/obtain that identifier where group_id is built so every
connection yields a distinct group name.
| except Exception as start_err: | ||
| logger.error( | ||
| "Egress consumer failed to start for user=%s room=%s: %s", | ||
| sanitize_for_log(user_id), | ||
| sanitize_for_log(room_code), | ||
| start_err, | ||
| ) | ||
| egress_ready.set() # Unblock ingest so ingest can still run | ||
| return |
There was a problem hiding this comment.
This fallback path still kills ingest.
After egress_ready.set(), this return completes task2, and the asyncio.wait(..., return_when=FIRST_COMPLETED) below immediately cancels the still-pending ingest task. So the “ingest can still run” comment is not true in the current control flow.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/modules/meeting/ws_router.py` around lines 233 - 241, The except block
that logs the egress start failure must not complete task2 because
asyncio.wait(..., return_when=FIRST_COMPLETED) will cancel the still-pending
ingest task; after setting egress_ready you should not return—keep the egress
task pending so asyncio.wait does not treat it as finished. Replace the final
`return` in the egress-start except handler with logic that leaves the coroutine
pending (for example by awaiting a never-completing Future or otherwise blocking
until canceled) so task2 remains alive; this change should be made in the except
block where egress_ready is set (symbols: egress_ready, task2, ingest task,
asyncio.wait(..., return_when=FIRST_COMPLETED)).
… egress
speaking_languageinstead of the listening language for transcription.speaking_languageso guests are correctly transcribed after joining.sanitize_for_log.ws_router.py.Summary by CodeRabbit
New Features
Improvements