Skip to content

embedded-software-laboratory/StreamRTPS

Repository files navigation

Bandwidth-Efficient RTPS Extensions for embeddedRTPS

This repository extends embeddedRTPS with three wire-protocol extensions that improve the bandwidth efficiency of the Real-Time Publish Subscribe (RTPS) protocol underlying the Data Distribution Service (DDS). Conventional RTPS pays fixed per-message overheads, a full RTPS header on every datagram, separate UDP/IP headers per sample, and periodic heartbeats independent of traffic, that dominate the wire footprint of small, periodic samples typical for embedded and automotive workloads. The three mechanisms in this fork, stream headers, payload aggregation, and predictive heartbeat suppression, attack each of those overheads in turn while remaining interoperable with stock RTPS peers. Activation is negotiated through DDS discovery, so an extended participant talks RTPS to legacy peers and uses the compact encodings only with peers that advertise support. Experimental results show that stream headers reduce bandwidth consumption by up to 27.9 % versus conventional RTPS under best-effort transport, and that heartbeat suppression yields a further 22.7 % reduction on top of stream headers under reliable transport, while preserving transmission latency in both cases.

Upstream: The unmodified embeddedRTPS reference implementation lives at https://github.com/embedded-software-laboratory/embeddedRTPS. This fork adds the stream-negotiation, payload-aggregation, and heartbeat-policy layers on top of that codebase.

What the three extensions do

A standard RTPS writer prepends a full RTPS header (GuidPrefix, version, vendor, etc.) to every datagram, sends one UDP packet per sample even when several samples share a locator, and emits heartbeats on a fixed schedule regardless of whether the receiver could already predict the next sample. Each extension renegotiates one of those contracts:

  1. Stream Negotiation (STNE): During discovery, writer and reader exchange the static parts of the RTPS header (protocol version, vendor, GuidPrefix, flags, header format) once and bind them to a compact 2 B StreamId. At runtime the writer replaces the full RTPS header with that identifier; the receiver reconstructs the header from the per-stream context on arrival. Unsupported peers fall back to the standard RTPS header.
  2. Payload Aggregation: Samples bound for the same locator are batched into a single UDP datagram by a per-locator aggregator with either a deadline (max-latency) or period (rate-locked) trigger. This amortizes the IP/UDP header cost across multiple samples and combines naturally with stream headers, since a stream-encoded batch carries one shared 2 B identifier instead of one full RTPS header per submessage.
  3. Predictive Heartbeat Suppression: For reliable streams whose sample arrivals look periodic, a continuity heartbeat policy suppresses heartbeats entirely while the inter-arrival variance stays below threshold, since a receiver that has seen a steady cadence can detect loss from the cadence alone. The policy falls back to standard periodic or adaptive heartbeats on detected loss, timing violations, or irregular arrival patterns. Two additional modes (FixedFrequency, AdaptiveFrequency) cover non-periodic traffic.

All three mechanisms preserve RTPS compatibility: the DDS discovery exchange is extended with a FeatureQOS block (supportStreams, supportAggregation, heartbeatAggregation, heartbeatPolicy) so each pair of participants only enables what both sides advertise.

Core components

All extension code lives next to the upstream embeddedRTPS sources and is namespaced consistently. The pieces that make up the three features are:

Area Files Role
Feature negotiation include/rtps/config/FeatureQOS.h FeatureQOS block carried in discovery: per-participant flags for streams, payload aggregation, heartbeat aggregation, and the HeartbeatPolicyMode selector.
Stream negotiation agent include/rtps/discovery/STNEAgent.h Stream Negotiation Endpoint: runs the writer↔reader handshake that binds a StreamId_t to the static header context and retries on loss.
Stream handshake wire data include/rtps/discovery/StreamHandshake.h Serializable StreamHandshake / StreamAck records exchanged over the STNE builtin endpoint (writerGuid, readerGuid, version, vendor, inlineQOS, flags, StreamHeaderFormat_t).
Stream / aggregated header types include/rtps/common/types.h StreamId_t, StreamStatus_t, StreamHeaderFormat_t (SIMPLE / AGGREGATED / RELIABLE), ContinuityMode_t.
Payload aggregator include/rtps/entities/Aggregator.h, Aggregator.tpp Per-locator aggregator base plus DeadlineAggregator (flush at max-latency deadline) and PeriodicAggregator (flush on a fixed period). Optionally prepends a shared stream/RTPS header before flushing.
Heartbeat policy include/rtps/entities/transient/HeartbeatPolicy.h, HeartbeatPolicy.tpp shouldSend() / onAckNack() / onNewChange() policy interface. Implements FixedFrequency, AdaptiveFrequency, and Continuity modes (the suppression strategy with ACQUIRE / IMPLICIT / EXPLICIT states).
Stateful writer integration include/rtps/entities/StatefulWriter.h, StatefulWriter.tpp Owns the HeartbeatPolicy instance and consults it instead of a hard-coded period.
Message receiver include/rtps/messages/MessageReceiver.h Reconstructs the full RTPS header from a StreamId lookup when a stream-encoded datagram arrives.

Directory structure

Directory Description
include/rtps/config/ FeatureQOS: discovery-level activation of the three extensions
include/rtps/discovery/ STNEAgent, StreamHandshake next to SPDP/SEDP
include/rtps/entities/ Aggregator (deadline / periodic), modified StatefulWriter
include/rtps/entities/transient/ HeartbeatPolicy (fixed / adaptive / continuity), AckNackPolicy, rate-lock constants
include/rtps/messages/ MessageReceiver with stream-header reconstruction
tracer/ Tracer node executables used for the bandwidth/latency evaluation
tracer/idl/ IDL sources for the tracer payload types
example/ Request/response examples (ReqResp, ReqRespRel, ReqRespMult, MessageChain)
tools/ Code generation (Micro-XRCE-DDS-Gen) for message types
thirdparty/ Bundled dependencies (e.g. Micro-CDR)

Dependencies

All dependencies are integrated as submodules. Make sure to use git clone --recursive when cloning.

Runtime (on nodes): LTTng-UST (liblttng-ust.so) for userspace tracing. Install via apt install liblttng-ust-dev or the SOATracer Ansible playbook.

Build-time: CMake, C++17 compiler, LTTng-UST headers, nlohmann-json (for tracer JSON config parsing).

Build

git submodule update --init --recursive
mkdir build && cd build
cmake ..
make -j4

License

MIT. See LICENSE. The base embeddedRTPS implementation that this work is derived from is published under the same terms at https://github.com/embedded-software-laboratory/embeddedRTPS.

About

This repository extends embeddedRTPS with three wire-protocol extensions that improve the bandwidth efficiency of the Real-Time Publish Subscribe (RTPS) protocol underlying the Data Distribution Service (DDS).

Resources

License

Stars

Watchers

Forks

Contributors