Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 70 additions & 11 deletions Runtime/Scripts/AudioStream.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using UnityEngine;
using LiveKit.Internal;
using LiveKit.Proto;
Expand Down Expand Up @@ -40,6 +42,12 @@ public sealed class AudioStream : IDisposable
private const int CrossfadeFrames = 128; // ~2.7ms @ 48kHz
private int _skipCooldown = 0;

// Resample work runs on a dedicated background thread instead of the FFI callback
// thread, which is OS-audio-scheduled and would otherwise compete with Unity audio.
private readonly BlockingCollection<AudioFrame> _frameQueue =
new BlockingCollection<AudioFrame>(new ConcurrentQueue<AudioFrame>(), boundedCapacity: 50);
private readonly Thread _resampleThread;

/// <summary>
/// Creates a new audio stream from a remote audio track, attaching it to the
/// given <see cref="AudioSource"/> in the scene.
Expand Down Expand Up @@ -73,6 +81,13 @@ public AudioStream(RemoteAudioTrack audioTrack, AudioSource source)

// Subscribe to application pause events to handle background/foreground transitions
MonoBehaviourContext.OnApplicationPauseEvent += OnApplicationPause;

_resampleThread = new Thread(ResampleLoop)
{
IsBackground = true,
Name = "LiveKit AudioStream Resample",
};
_resampleThread.Start();
}

// Called on Unity audio thread
Expand Down Expand Up @@ -235,7 +250,8 @@ internal void OnApplicationPause(bool pause)
}
}

// Called on FFI callback thread
// Called on FFI callback thread. Hands the frame off to the worker so the
// OS-audio-scheduled FFI thread doesn't pay for resample DSP.
private void OnAudioStreamEvent(AudioStreamEvent e)
{
if (_disposed)
Expand All @@ -247,24 +263,58 @@ private void OnAudioStreamEvent(AudioStreamEvent e)
if (e.MessageCase != AudioStreamEvent.MessageOneofCase.FrameReceived)
return;

using var frame = new AudioFrame(e.FrameReceived.Frame);

lock (_lock)
var frame = new AudioFrame(e.FrameReceived.Frame);
try
{
if (_numChannels == 0)
if (!_frameQueue.IsAddingCompleted && _frameQueue.TryAdd(frame, millisecondsTimeout: 0))
return;
}
catch (InvalidOperationException)
{
// Adding was completed between the IsAddingCompleted check and TryAdd.
}

// Queue full or shutting down — drop the frame.
Utils.Debug("AudioStream resample queue full; dropping frame");
frame.Dispose();
}

unsafe
// Background worker thread. Drains _frameQueue and runs RemixAndResample off the
// FFI callback thread so DSP work doesn't compete with Unity audio rendering.
private void ResampleLoop()
{
try
{
foreach (var frame in _frameQueue.GetConsumingEnumerable())
{
using var uFrame = _resampler.RemixAndResample(frame, _numChannels, _sampleRate);
if (uFrame != null)
try
{
var data = new Span<byte>(uFrame.Data.ToPointer(), uFrame.Length);
_buffer?.Write(data);
}
lock (_lock)
{
if (_disposed || _numChannels == 0)
continue;

unsafe
{
using var uFrame = _resampler.RemixAndResample(frame, _numChannels, _sampleRate);
if (uFrame != null)
{
var data = new Span<byte>(uFrame.Data.ToPointer(), uFrame.Length);
_buffer?.Write(data);
}
}
}
}
finally
{
frame.Dispose();
}
}
}
catch (Exception ex)
{
Utils.Error($"AudioStream ResampleLoop exited unexpectedly: {ex}");
}
}

public void Dispose()
Expand All @@ -286,6 +336,13 @@ private void Dispose(bool disposing)
FfiClient.Instance.AudioStreamEventReceived -= OnAudioStreamEvent;
MonoBehaviourContext.OnApplicationPauseEvent -= OnApplicationPause;

// Stop the worker before tearing down state it touches under _lock. Joining is only
// safe on the explicit-dispose path; from a finalizer we just signal and let the
// background thread exit on its own.
_frameQueue.CompleteAdding();
if (disposing && _resampleThread != null && _resampleThread.IsAlive)
_resampleThread.Join();

lock (_lock)
{
// Native resources can be released on both the explicit-dispose and finalizer
Expand All @@ -310,6 +367,8 @@ private void Dispose(bool disposing)
Handle.Dispose();
}

_frameQueue.Dispose();

_disposed = true;
}

Expand Down
10 changes: 7 additions & 3 deletions Tests/EditMode/MediaStreamLifetimeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public void AudioStream_Dispose_UnsubscribesAndReleasesOwnedResources()

StringAssert.Contains("FfiClient.Instance.AudioStreamEventReceived -= OnAudioStreamEvent;", source);
StringAssert.Contains("_probe.AudioRead -= OnAudioRead;", source);
StringAssert.Contains("_frameQueue.CompleteAdding();", source);
StringAssert.Contains("_resampleThread.Join();", source);
StringAssert.Contains("_buffer?.Dispose();", source);
StringAssert.Contains("_resampler?.Dispose();", source);
StringAssert.Contains("Handle.Dispose();", source);
Expand All @@ -132,9 +134,11 @@ public void AudioStream_AudioFrames_AreDisposedAfterProcessing()
{
var source = ReadSource(AudioStreamPaths);

// Both the inbound native frame and the remixed output frame should be scoped so their
// handles are released after each callback rather than accumulating over time.
StringAssert.Contains("using var frame = new AudioFrame(e.FrameReceived.Frame);", source);
// The inbound frame is enqueued on the FFI thread and disposed by the worker
// (in finally) after RemixAndResample. The remixed output frame is still scoped
// with using so handles do not accumulate.
StringAssert.Contains("var frame = new AudioFrame(e.FrameReceived.Frame);", source);
StringAssert.Contains("frame.Dispose();", source);
StringAssert.Contains("using var uFrame = _resampler.RemixAndResample(frame, _numChannels, _sampleRate);", source);
}

Expand Down
Loading