chore(kafka): implement staleness filtering and enforce short topic r…#72
Conversation
…etention - Add `max_message_age_ms` staleness guard to `BaseConsumer` to automatically skip and commit backlogged messages from dead sessions. - Configure STT, Translation, and TTS workers to discard any messages older than 2 minutes to prevent processing "ghost" audio on restarts. - Enforce a 5-minute `retention.ms` policy on all real-time pipeline topics (`audio.*`, `text.*`) in `KafkaManager` using `alter_configs`. - Fix `alter_configs` API usage in `KafkaManager` by utilizing `ConfigResource` objects. - This ensures the pipeline remains lean and immediately responsive even after backend downtime or restarts. Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (6)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…etention
max_message_age_msstaleness guard toBaseConsumerto automatically skip and commit backlogged messages from dead sessions.retention.mspolicy on all real-time pipeline topics (audio.*,text.*) inKafkaManagerusingalter_configs.alter_configsAPI usage inKafkaManagerby utilizingConfigResourceobjects.