This repository extends embeddedRTPS with three wire-protocol extensions that improve the bandwidth efficiency of the Real-Time Publish Subscribe (RTPS) protocol underlying the Data Distribution Service (DDS). Conventional RTPS pays fixed per-message overheads, a full RTPS header on every datagram, separate UDP/IP headers per sample, and periodic heartbeats independent of traffic, that dominate the wire footprint of small, periodic samples typical for embedded and automotive workloads. The three mechanisms in this fork, stream headers, payload aggregation, and predictive heartbeat suppression, attack each of those overheads in turn while remaining interoperable with stock RTPS peers. Activation is negotiated through DDS discovery, so an extended participant talks RTPS to legacy peers and uses the compact encodings only with peers that advertise support. Experimental results show that stream headers reduce bandwidth consumption by up to 27.9 % versus conventional RTPS under best-effort transport, and that heartbeat suppression yields a further 22.7 % reduction on top of stream headers under reliable transport, while preserving transmission latency in both cases.
Upstream: The unmodified embeddedRTPS reference implementation lives at https://github.com/embedded-software-laboratory/embeddedRTPS. This fork adds the stream-negotiation, payload-aggregation, and heartbeat-policy layers on top of that codebase.
A standard RTPS writer prepends a full RTPS header (GuidPrefix, version, vendor, etc.) to every datagram, sends one UDP packet per sample even when several samples share a locator, and emits heartbeats on a fixed schedule regardless of whether the receiver could already predict the next sample. Each extension renegotiates one of those contracts:
- Stream Negotiation (STNE): During discovery, writer and reader exchange the static parts of the RTPS header (protocol version, vendor, GuidPrefix, flags, header format) once and bind them to a compact 2 B
StreamId. At runtime the writer replaces the full RTPS header with that identifier; the receiver reconstructs the header from the per-stream context on arrival. Unsupported peers fall back to the standard RTPS header. - Payload Aggregation: Samples bound for the same locator are batched into a single UDP datagram by a per-locator aggregator with either a deadline (max-latency) or period (rate-locked) trigger. This amortizes the IP/UDP header cost across multiple samples and combines naturally with stream headers, since a stream-encoded batch carries one shared 2 B identifier instead of one full RTPS header per submessage.
- Predictive Heartbeat Suppression: For reliable streams whose sample arrivals look periodic, a continuity heartbeat policy suppresses heartbeats entirely while the inter-arrival variance stays below threshold, since a receiver that has seen a steady cadence can detect loss from the cadence alone. The policy falls back to standard periodic or adaptive heartbeats on detected loss, timing violations, or irregular arrival patterns. Two additional modes (
FixedFrequency,AdaptiveFrequency) cover non-periodic traffic.
All three mechanisms preserve RTPS compatibility: the DDS discovery exchange is extended with a FeatureQOS block (supportStreams, supportAggregation, heartbeatAggregation, heartbeatPolicy) so each pair of participants only enables what both sides advertise.
All extension code lives next to the upstream embeddedRTPS sources and is namespaced consistently. The pieces that make up the three features are:
| Area | Files | Role |
|---|---|---|
| Feature negotiation | include/rtps/config/FeatureQOS.h | FeatureQOS block carried in discovery: per-participant flags for streams, payload aggregation, heartbeat aggregation, and the HeartbeatPolicyMode selector. |
| Stream negotiation agent | include/rtps/discovery/STNEAgent.h | Stream Negotiation Endpoint: runs the writer↔reader handshake that binds a StreamId_t to the static header context and retries on loss. |
| Stream handshake wire data | include/rtps/discovery/StreamHandshake.h | Serializable StreamHandshake / StreamAck records exchanged over the STNE builtin endpoint (writerGuid, readerGuid, version, vendor, inlineQOS, flags, StreamHeaderFormat_t). |
| Stream / aggregated header types | include/rtps/common/types.h | StreamId_t, StreamStatus_t, StreamHeaderFormat_t (SIMPLE / AGGREGATED / RELIABLE), ContinuityMode_t. |
| Payload aggregator | include/rtps/entities/Aggregator.h, Aggregator.tpp | Per-locator aggregator base plus DeadlineAggregator (flush at max-latency deadline) and PeriodicAggregator (flush on a fixed period). Optionally prepends a shared stream/RTPS header before flushing. |
| Heartbeat policy | include/rtps/entities/transient/HeartbeatPolicy.h, HeartbeatPolicy.tpp | shouldSend() / onAckNack() / onNewChange() policy interface. Implements FixedFrequency, AdaptiveFrequency, and Continuity modes (the suppression strategy with ACQUIRE / IMPLICIT / EXPLICIT states). |
| Stateful writer integration | include/rtps/entities/StatefulWriter.h, StatefulWriter.tpp | Owns the HeartbeatPolicy instance and consults it instead of a hard-coded period. |
| Message receiver | include/rtps/messages/MessageReceiver.h | Reconstructs the full RTPS header from a StreamId lookup when a stream-encoded datagram arrives. |
| Directory | Description |
|---|---|
| include/rtps/config/ | FeatureQOS: discovery-level activation of the three extensions |
| include/rtps/discovery/ | STNEAgent, StreamHandshake next to SPDP/SEDP |
| include/rtps/entities/ | Aggregator (deadline / periodic), modified StatefulWriter |
| include/rtps/entities/transient/ | HeartbeatPolicy (fixed / adaptive / continuity), AckNackPolicy, rate-lock constants |
| include/rtps/messages/ | MessageReceiver with stream-header reconstruction |
| tracer/ | Tracer node executables used for the bandwidth/latency evaluation |
| tracer/idl/ | IDL sources for the tracer payload types |
| example/ | Request/response examples (ReqResp, ReqRespRel, ReqRespMult, MessageChain) |
| tools/ | Code generation (Micro-XRCE-DDS-Gen) for message types |
| thirdparty/ | Bundled dependencies (e.g. Micro-CDR) |
All dependencies are integrated as submodules. Make sure to use git clone --recursive when cloning.
Runtime (on nodes): LTTng-UST (liblttng-ust.so) for userspace tracing. Install via apt install liblttng-ust-dev or the SOATracer Ansible playbook.
Build-time: CMake, C++17 compiler, LTTng-UST headers, nlohmann-json (for tracer JSON config parsing).
git submodule update --init --recursive
mkdir build && cd build
cmake ..
make -j4MIT. See LICENSE. The base embeddedRTPS implementation that this work is derived from is published under the same terms at https://github.com/embedded-software-laboratory/embeddedRTPS.