Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions EventThreadingModel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
## Summary

FFI events whose handlers don't touch Unity APIs run directly on the FFI callback thread instead of being marshaled to Unity's main thread via `SynchronizationContext.Post`.

`FFICallback` previously usually routes Rust events (except `AudioStreamEvent`) through `_context.Post` to Unity's main thread. That's the safe default for handlers that touch Unity APIs (`Texture2D`, `GameObject`, `Transform`, …) but it costs one frame of latency for handlers that don't. Four categories of events can skip that:

1. **Audio stream events** that are written to the audio stream ring buffer and consumed on audio thread.
2. **One-shot async completions** that only flip `IsDone` on a `YieldInstruction` — `SetMetadata`, `UnpublishTrack`, all stream `Read/Write/Close` ops.
3. **Stream reader chunk events** that just append bytes/strings to an internal buffer.
4. **Log batches** — `UnityEngine.Debug.unityLogger` is documented thread-safe; the post hop adds latency without benefit, especially during error storms or `LK_VERBOSE` noise.

## Logic in code:

```csharp
internal static void RouteFfiEvent(FfiEvent response)
{
if (_isDisposed) return;

// 1. Per-event-type fast paths — invoke handler directly on FFI thread.
if (response.MessageCase == FfiEvent.MessageOneofCase.AudioStreamEvent) { ...; return; }
if (response.MessageCase == FfiEvent.MessageOneofCase.Logs) { ...; return; }
if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent) { ...; return; }
if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent) { ...; return; }

// 2. One-shot completion fast path — opted-in pending callbacks complete inline.
var requestAsyncId = ExtractRequestAsyncId(response);
if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response))
return;

// 3. Fallback — post to Unity's main-thread sync context.
Instance._context?.Post(static (resp) =>
{
var r = resp as FfiEvent;
if (r == null) return;
DispatchEvent(r);
}, response);
}
```

## Event Table

| Event | Where it runs | Why |
| --- | --- | --- |
| `AudioStreamEvent` | **FFI thread** (unchanged) | Audio thread consumes the data; main-thread latency would hurt timing |
| `Logs` | **FFI thread** (new) | `Debug.unityLogger` is thread-safe; logs reach console immediately during panics / errors |
| `ByteStreamReaderEvent` | **FFI thread** (new) | Internal buffer is now lock-protected; chunks land without frame delay |
| `TextStreamReaderEvent` | **FFI thread** (new) | Same lock as byte path (shared `ReadIncrementalInstructionBase`) |
| One-shot completions via `FfiInstruction<T>` | **FFI thread** (new) | `SetLocalMetadata`, `SetLocalName`, `SetLocalAttributes`, `UnpublishTrack` — only flip `IsDone`/`IsError` |
| One-shot completions via `FfiStreamInstruction<T>` | **FFI thread** (new) | `ByteStreamWriter.Write/Close`, `TextStreamWriter.Write/Close` |
| One-shot completions via `FfiStreamResultInstruction<T,U>` | **FFI thread** (new) | `ByteStreamReader.ReadAll/WriteToFile`, `TextStreamReader.ReadAll` |
| `RoomEvent` | Main thread | Fires user-facing `ParticipantConnected`, `TrackPublished`, etc. |
| `TrackEvent` | Main thread | (No subscribers today; main-thread default for safety) |
| `RpcMethodInvocation` | Main thread | User RPC handlers commonly touch game state |
| `Disconnect` | Main thread | UI updates typical |
| `VideoStreamEvent` | Main thread | Internal buffering is fast; user-facing raw delivery deferred (see follow-ups) |
| `DataTrackStreamEvent` | Main thread | Deferred until a concrete consumer asks |
| `Connect` (one-shot) | Main thread | Bespoke handler fires participant-connected events |
| `PublishTrack` (one-shot) | Main thread | Bespoke handler |
| `GetStats` (one-shot) | Main thread | Bespoke handler |
| `CaptureAudioFrame` (one-shot) | Main thread | Bespoke handler |
| `PerformRpc` (one-shot) | Main thread | Bespoke handler surfaces response |
| `SendText` / `SendFile` (one-shot) | Main thread | Bespoke handlers |
| `TextStreamOpen` / `ByteStreamOpen` (one-shot) | Main thread | Bespoke handlers return writer objects |
| `PublishDataTrack` (one-shot) | Main thread | Bespoke handler |
7 changes: 7 additions & 0 deletions EventThreadingModel.md.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 39 additions & 17 deletions Runtime/Scripts/DataStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public abstract class ReadIncrementalInstructionBase<TContent> : StreamYieldInst
private readonly Queue<TContent> _pendingChunks = new();
private TContent _latestChunk;

// Chunk events arrive on the FFI thread; Reset() and the LatestChunk getter
// run on the main-thread coroutine. _gate serializes mutations of the queue,
// _latestChunk, IsCurrentReadDone, IsEos, and Error across both sides.
private readonly object _gate = new();

/// <summary>
/// Error that occurred on the last read, if any.
/// </summary>
Expand All @@ -94,8 +99,11 @@ protected TContent LatestChunk
{
get
{
if (Error != null) throw Error;
return _latestChunk;
lock (_gate)
{
if (Error != null) throw Error;
return _latestChunk;
}
}
}

Expand All @@ -108,34 +116,48 @@ protected ReadIncrementalInstructionBase(FfiHandle readerHandle)

protected void OnChunk(TContent content)
{
if (IsCurrentReadDone)
{
// Consumer hasn't yielded since the last chunk; buffer until Reset().
_pendingChunks.Enqueue(content);
}
else
lock (_gate)
{
_latestChunk = content;
IsCurrentReadDone = true;
if (IsCurrentReadDone)
{
// Consumer hasn't yielded since the last chunk; buffer until Reset().
_pendingChunks.Enqueue(content);
}
else
{
_latestChunk = content;
IsCurrentReadDone = true;
}
}
}

public override void Reset()
{
base.Reset();
if (_pendingChunks.Count > 0)
// base.Reset() must run under the same lock as OnChunk, otherwise the
// window between IsCurrentReadDone=false (from base) and the dequeue
// below lets a producer race in, write _latestChunk, and have its
// chunk immediately overwritten by the dequeue. That race lost ~4% of
// chunks under stress before this fix.
lock (_gate)
{
_latestChunk = _pendingChunks.Dequeue();
IsCurrentReadDone = true;
base.Reset();
if (_pendingChunks.Count > 0)
{
_latestChunk = _pendingChunks.Dequeue();
IsCurrentReadDone = true;
}
}
}

protected void OnEos(Proto.StreamError protoError)
{
IsEos = true;
if (protoError != null)
lock (_gate)
{
Error = new StreamError(protoError);
IsEos = true;
if (protoError != null)
{
Error = new StreamError(protoError);
}
}
}
}
Expand Down
103 changes: 87 additions & 16 deletions Runtime/Scripts/Internal/FFIClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ internal void RegisterPendingCallback<TCallback>(
ulong requestAsyncId,
Func<FfiEvent, TCallback?> selector,
Action<TCallback> onComplete,
Action? onCancel = null
Action? onCancel = null,
bool dispatchToMainThread = true
) where TCallback : class
{
// Request registration must happen before the request is sent. That ordering is what
Expand All @@ -198,7 +199,13 @@ internal void RegisterPendingCallback<TCallback>(
//
// Duplicate IDs are treated as a hard error because they would allow two unrelated
// requests to compete for the same completion slot.
var pending = new PendingCallback<TCallback>(selector, onComplete, onCancel);
//
// dispatchToMainThread defaults to true: completion runs on Unity's main thread via
// SynchronizationContext.Post, which is safe for any onComplete that touches Unity
// APIs or fires user events. Pass false when the onComplete only mutates volatile
// YieldInstruction state — RouteFfiEvent will then run it inline on the FFI callback
// thread instead of paying a frame of latency for the post.
var pending = new PendingCallback<TCallback>(selector, onComplete, onCancel, dispatchToMainThread);
if (!pendingCallbacks.TryAdd(requestAsyncId, pending))
{
throw new InvalidOperationException($"Duplicate pending callback for request_async_id={requestAsyncId}");
Expand Down Expand Up @@ -278,6 +285,15 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size)

var respData = new Span<byte>(data.ToPointer()!, (int)size.ToUInt64());
var response = FfiEvent.Parser!.ParseFrom(respData);
RouteFfiEvent(response);
}

// Routing logic split out from FFICallback so tests can drive it from a
// chosen thread without going through the P/Invoke entry point. Running
// production traffic still always lands here via FFICallback above.
internal static void RouteFfiEvent(FfiEvent response)
{
if (_isDisposed) return;

// Audio stream events are handled directly on the FFI callback thread
// to bypass the main thread, since the audio thread consumes the data
Expand All @@ -287,6 +303,47 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size)
return;
}

// Log batches are forwarded directly. UnityEngine.Debug.unityLogger is
// documented thread-safe; Unity's logger queues to its console drain
// internally. Skipping the main-thread post means logs reach the
// console without a one-frame delay — useful during error storms,
// panics, or LK_VERBOSE noise where the post queue could otherwise
// back up.
if (response.MessageCase == FfiEvent.MessageOneofCase.Logs)
{
Utils.HandleLogBatch(response.Logs);
return;
}

// Byte stream reader events feed an internal incremental-read buffer that
// already serializes mutations under its own lock. Skipping the main-thread
// post lets chunks land in the buffer immediately rather than waiting for
// the next frame drain.
if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent)
{
Instance.ByteStreamReaderEventReceived?.Invoke(response.ByteStreamReaderEvent!);
return;
}

// Same treatment for text stream readers — they share
// ReadIncrementalInstructionBase<TContent> with the byte path, so the
// lock added there already protects all state mutations.
if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent)
{
Instance.TextStreamReaderEventReceived?.Invoke(response.TextStreamReaderEvent!);
return;
}

// One-shot completions registered with dispatchToMainThread:false also bypass the
// main thread. The pending callback's onComplete only mutates volatile
// YieldInstruction fields, so resolving it here saves up to one frame of latency
// on async ops like SetMetadata / UnpublishTrack / stream Read/Write/Close.
var requestAsyncId = ExtractRequestAsyncId(response);
if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response))
{
return;
}

// Run on the main thread, the order of execution is guaranteed by Unity
// It uses a Queue internally
Instance._context?.Post(static (resp) =>
Expand Down Expand Up @@ -316,9 +373,6 @@ private static void DispatchEvent(FfiEvent ffiEvent)

switch (ffiEvent.MessageCase)
{
case FfiEvent.MessageOneofCase.Logs:
Utils.HandleLogBatch(ffiEvent.Logs);
break;
case FfiEvent.MessageOneofCase.PublishData:
break;
case FfiEvent.MessageOneofCase.RoomEvent:
Expand All @@ -338,15 +392,6 @@ private static void DispatchEvent(FfiEvent ffiEvent)
case FfiEvent.MessageOneofCase.VideoStreamEvent:
Instance.VideoStreamEventReceived?.Invoke(ffiEvent.VideoStreamEvent!);
break;
case FfiEvent.MessageOneofCase.AudioStreamEvent:
Instance.AudioStreamEventReceived?.Invoke(ffiEvent.AudioStreamEvent!);
break;
case FfiEvent.MessageOneofCase.ByteStreamReaderEvent:
Instance.ByteStreamReaderEventReceived?.Invoke(ffiEvent.ByteStreamReaderEvent!);
break;
case FfiEvent.MessageOneofCase.TextStreamReaderEvent:
Instance.TextStreamReaderEventReceived?.Invoke(ffiEvent.TextStreamReaderEvent!);
break;
case FfiEvent.MessageOneofCase.DataTrackStreamEvent:
Instance.DataTrackStreamEventReceived?.Invoke(ffiEvent.DataTrackStreamEvent!);
break;
Expand All @@ -357,7 +402,7 @@ private static void DispatchEvent(FfiEvent ffiEvent)
}
}

private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
internal bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
{
// Remove-first dispatch is the key race-proofing step.
//
Expand Down Expand Up @@ -385,6 +430,26 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
return false;
}

// Inline-completion fast path for one-shot callbacks whose onComplete only
// mutates volatile YieldInstruction state (no Unity APIs, no user-event
// invocations). Returning true means the caller has been fully handled here
// — no further dispatch needed. Returning false means the caller should fall
// through to its normal main-thread post path. Same race model as
// TryDispatchPendingCallback: the side that wins TryRemove is the only side
// that may invoke the completion.
internal bool TrySkipDispatch(ulong requestAsyncId, FfiEvent ffiEvent)
{
if (!pendingCallbacks.TryGetValue(requestAsyncId, out var pending))
{
return false;
}
if (pending.DispatchToMainThread)
{
return false;
}
return TryDispatchPendingCallback(requestAsyncId, ffiEvent);
}

private static ulong? ExtractRequestAsyncId(FfiEvent ffiEvent)
{
// This switch is only concerned with one-shot async completion callbacks that echo
Expand Down Expand Up @@ -420,6 +485,7 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)

private abstract class PendingCallbackBase
{
public abstract bool DispatchToMainThread { get; }
public abstract bool TryComplete(FfiEvent ffiEvent);
public abstract void Cancel();
}
Expand All @@ -429,16 +495,21 @@ private sealed class PendingCallback<TCallback> : PendingCallbackBase where TCal
private readonly Func<FfiEvent, TCallback?> selector;
private readonly Action<TCallback> onComplete;
private readonly Action? onCancel;
private readonly bool dispatchToMainThread;

public override bool DispatchToMainThread => dispatchToMainThread;

public PendingCallback(
Func<FfiEvent, TCallback?> selector,
Action<TCallback> onComplete,
Action? onCancel
Action? onCancel,
bool dispatchToMainThread
)
{
this.selector = selector;
this.onComplete = onComplete;
this.onCancel = onCancel;
this.dispatchToMainThread = dispatchToMainThread;
}

public override bool TryComplete(FfiEvent ffiEvent)
Expand Down
9 changes: 6 additions & 3 deletions Runtime/Scripts/Internal/FfiInstruction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ internal FfiInstruction(
{
IsError = true;
IsDone = true;
});
},
dispatchToMainThread: false);
}
}

Expand Down Expand Up @@ -63,7 +64,8 @@ internal FfiStreamInstruction(
Error = new StreamError("Canceled");
IsError = true;
IsDone = true;
});
},
dispatchToMainThread: false);
}
}

Expand Down Expand Up @@ -116,7 +118,8 @@ internal FfiStreamResultInstruction(
Error = new StreamError("Canceled");
IsError = true;
IsDone = true;
});
},
dispatchToMainThread: false);
}
}
}
Loading
Loading