Default to Zenoh transport on macOS and document replay workflow#1906
Default to Zenoh transport on macOS and document replay workflow#1906bogwi wants to merge 36 commits intodimensionalOS:devfrom
Conversation
Greptile SummaryThis PR adds Zenoh as an alternative stream transport to LCM, defaulting to it on macOS when
Confidence Score: 5/5Safe to merge — new Zenoh transport is fully opt-in and guarded by ZENOH_AVAILABLE; all existing LCM paths are unmodified. All previously flagged issues (importorskip guards, _sessions race in test cleanup, bridge pubsubs field ignored) have been addressed in the intervening commits. The new transport branching, session-singleton management, and bridge pubsub resolution are correct and covered by tests. Linux defaults remain unchanged. The Zenoh implementation is conditionally compiled and cannot regress non-Zenoh environments. No files require special attention. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[CLI / GlobalConfig init] --> B{platform.system == Darwin
and ZENOH_AVAILABLE?}
B -- Yes --> C[transport = 'zenoh']
B -- No --> D[transport = 'lcm']
C --> E[ModuleCoordinator._get_transport_for]
D --> E
E --> F{global_config.transport}
F -- lcm --> G[LCMTransport or pLCMTransport
topic: '/name']
F -- zenoh --> H{ZENOH_AVAILABLE?}
H -- No --> I[RuntimeError: install hint]
H -- Yes --> J[ZenohTransport or pZenohTransport
topic: 'dimos/name']
G --> K[Module streams live on LCM multicast]
J --> L[Module streams live on Zenoh session]
L --> M[RerunBridgeModule._resolve_pubsubs]
K --> M
M --> N{pubsubs explicitly overridden
with non-LCM backend?}
N -- Yes --> O[Return custom pubsubs]
N -- No --> P{transport == 'zenoh'?}
P -- Yes --> Q[Return Zenoh + LCM
# LCM needed for TF frames]
P -- No --> R[Return LCM only]
Reviews (13): Last reviewed commit: "add pytest.importorskip("zenoh") guard" | Re-trigger Greptile |
4141991 to
fd20ca8
Compare
This is not the purpose of this branch to add these changes to It will be breaking cause anyone relying on unprefixed env vars for GlobalConfig will no longer get that effect; they need DIMOS_ for those settings to load from the environment / .env even though the cli.md in line 53 says, - "Environment variables and |
From Typical replay on macOS when Zenoh is installed (default is already Zenoh, so no transport flag is required): dimos --dtop --replay --replay-db=go2_bigoffice run unitree-go2The same workload on Linux (default remains dimos --transport=zenoh --dtop --replay --replay-db=go2_bigoffice run unitree-go2
|
593bb38 to
791b722
Compare
| from dimos.protocol.service.system_configurator.lcm_config import lcm_configurators | ||
|
|
||
| configurators = [*lcm_configurators(), *blueprint.configurator_checks] | ||
| lcm_checks = lcm_configurators() if global_config.transport == "lcm" else [] |
There was a problem hiding this comment.
Even if the global transport is zenoh, we still need to run LCM configurators so that LCM works for individual LCM transports. LCM configurators can only be skipped if we remove LCM entirely, no?
There was a problem hiding this comment.
How about a small bool helper:
def _blueprint_specifies_lcm_transport(blueprint: Blueprint) -> bool:
"""True when the merged blueprint wires at least one stream over LCM explicitly."""
for transport in blueprint.transport_map.values():
if isinstance(transport, (LCMTransport, pLCMTransport)):
return True
return Falsethen wire it:
def _run_configurators(blueprint: Blueprint) -> None:
...
need_lcm_system_config = global_config.transport == "lcm" or _blueprint_specifies_lcm_transport(
blueprint
)
lcm_checks = lcm_configurators() if need_lcm_system_config else []
configurators = [*lcm_checks, *blueprint.configurator_checks]
...
There was a problem hiding this comment.
I don't think that's sufficient, because we sometimes use LCM outside of module transports. For example see pLCMTransport("/human_input") in utils/cli/human/humancli.py.
My assumption is that we need to always configure LCM until we remove it (if we do that).
There was a problem hiding this comment.
Then let's make lcm_checks = lcm_configurators(). (No bool helper proposed is needed)
def _run_configurators(blueprint: Blueprint) -> None:
from dimos.protocol.service.system_configurator.base import configure_system
from dimos.protocol.service.system_configurator.lcm_config import lcm_configurators
lcm_checks = lcm_configurators()
configurators = [*lcm_checks, *blueprint.configurator_checks]
try:
configure_system(configurators)
except SystemExit:
labels = [type(c).__name__ for c in configurators]
print(
f"Required system configuration was declined: {', '.join(labels)}",
file=sys.stderr,
)
sys.exit(1)?
962aca2 to
c338ce7
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Prepare the codebase for Zenoh integration without changing behavior. All existing tests pass (1401 passed, 3 xfailed for Phase 2 stubs). - Add `transport` field to GlobalConfig (default: "lcm") - Add ZENOH_AVAILABLE guard in transport.py - Branch _get_transport_for() on global_config.transport - Gate LCM configurators to only run when transport is "lcm" - Add ZenohTransport/pZenohTransport behind ZENOH_AVAILABLE guard - Add zenohpubsub.py stub (raises NotImplementedError) - Add `zenoh` optional dependency group in pyproject.toml - Add test_zenoh_transport.py covering all new conditional branches
TDD: tests written first, then implementation. Follows DDSService pattern — module-level session dict with lock. - ZenohConfig with mode/connect/listen fields and session_key - ZenohService.start() opens session if not exists for config - ZenohService.stop() does NOT close shared session - session property raises RuntimeError if not started - Two services with same config share one session (8 tests pass)
TDD: tests written first, then implementation. - ZenohPubSubBase(ZenohService, AllPubSub[Topic, bytes]) - Publisher caching per key expression (avoids re-declaring) - Subscriber tracking for cleanup on stop() - Idempotent unsubscribe (guards against Zenoh ZError) - subscribe_all() via dimos/** wildcard - Zenoh and PickleZenoh composed classes (encoder mixins) - 7 unit tests pass
Both encoder-composed variants pass all spec conformance tests: - test_store, test_multiple_subscribers, test_unsubscribe - test_multiple_messages, test_async_iterator - 25 total tests pass (10 new Zenoh tests)
- Fix dimensionalOS#3: unsubscribe() now only calls undeclare() if it successfully removed the subscriber from the list. If stop() already cleared the list, unsubscribe() returns without double-undeclaring. - Fix dimensionalOS#5: on_sample callback wraps payload.to_bytes() in try/except to prevent malformed payloads from crashing Zenoh's internal thread.
Check membership before removing instead of catching ValueError. Reads more clearly and avoids using exceptions for control flow.
Two issues prevented the Rerun bridge from showing data over Zenoh: 1. The bridge hardcoded LCM() as its pubsub. Now resolves lazily at start() using self.config.g.transport from the worker's GlobalConfig. 2. Zenoh key expressions cannot contain '#' (forbidden character). Type info is now embedded as a '/' segment in the key expression (e.g., dimos/pointcloud/sensor_msgs.PointCloud2). _key_expr_to_topic reconstructs the Topic with lcm_type for subscribe_all decoding. Also fixes entity path mapping to strip the dimos/ prefix so Zenoh entity paths match LCM paths in the Rerun viewer.
- typed_out/untyped_out → typed_data/untyped_data - Use TypedMsg instead of Image for blueprint integration tests - Image still used in transport wrapper test (real LCM round-trip)
Replace raw time.sleep() calls with named helpers that document intent. wait_for_subscribers() explains Zenoh has no "subscriber ready" signal.
Replace manual if-both-received check with threading.Barrier(2). The previous approach could miss the event if both callbacks ran concurrently and checked the other's list before it was populated.
Review findings dimensionalOS#2 and dimensionalOS#4: - Remove Config.pubsubs from RerunBridgeModule — pubsubs are resolved lazily at start() from global_config.transport - Remove _zenoh_topic field from pZenohTransport — construct on demand like pLCMTransport does, avoiding dual state
8 new tests covering: - Typed/untyped topic → key expression conversion - Key expression → topic with known/unknown/missing type - Default lcm_type fallback - Round-trip typed and untyped Also documents known limitation: if a topic's base path ends with a segment matching a registered DimosMsg type name, _key_expr_to_topic will incorrectly split it. In practice this doesn't happen because stream names (cmd_vel, lidar) don't match type names.
Existing blueprints pass pubsubs=[LCM()] to RerunBridgeModule. Removing the field caused a Pydantic ValidationError (extra_forbidden). Keep the field but document that it's ignored — start() resolves the pubsub backend from global_config.transport instead.
TF (transform frames) is hardcoded to LCM in the Module base class. When transport=zenoh, module streams use Zenoh but TF stays on LCM. The bridge now listens on both so the robot pose updates in the viewer.
Zenoh tests used time.sleep() to wait for subscriber propagation, which is either too slow or too flaky in CI. Replace with _retry_until() that re-publishes in a tight loop until the subscriber's Event fires.
Calls zenoh.init_log_from_env_or("warn") at module load so that
RUST_LOG=debug surfaces Zenoh's Rust-side transport logs (including
SHM negotiation). Defaults to warn to avoid noise.
uv sync --extra zenoh would resolve dimos[dev] from PyPI instead of the local project, uninstalling other dependencies. The zenoh extra only needs eclipse-zenoh — base deps are already installed.
… missing Align with module_coordinator._get_transport_for: raise RuntimeError instead of silently falling back to LCM when transport is zenoh and eclipse-zenoh is not installed.
…v pip instead of uv sync --extra zenoh
Agent and MCP conftest subscribe on LCM while the coordinator uses global_config.transport, which defaults to Zenoh on Darwin when Zenoh is installed. Set transport to lcm so spies and modules share the same backend. The module reloading test runs a separate Python process; set transport to lcm in the REPL before ModuleCoordinator.build for the same reason.
c338ce7 to
1ab1916
Compare
Supersedes #1787.
This PR carries forward the Zenoh transport integration from #1787 and wraps it into a merge-ready branch that fixes the remaining macOS Big Office replay gap.
Validation
Typical replay on macOS when Zenoh is installed (default is already Zenoh, so no transport flag is required):
The same workload on Linux (default remains
lcmuntil you opt in):Notes