Skip to content

Default to Zenoh transport on macOS and document replay workflow#1906

Open
bogwi wants to merge 36 commits intodimensionalOS:devfrom
bogwi:feat/integrate-zenoh
Open

Default to Zenoh transport on macOS and document replay workflow#1906
bogwi wants to merge 36 commits intodimensionalOS:devfrom
bogwi:feat/integrate-zenoh

Conversation

@bogwi
Copy link
Copy Markdown
Collaborator

@bogwi bogwi commented Apr 23, 2026

Supersedes #1787.

This PR carries forward the Zenoh transport integration from #1787 and wraps it into a merge-ready branch that fixes the remaining macOS Big Office replay gap.

Validation

Typical replay on macOS when Zenoh is installed (default is already Zenoh, so no transport flag is required):

dimos --dtop --replay --replay-db=go2_bigoffice run unitree-go2

The same workload on Linux (default remains lcm until you opt in):

dimos --transport=zenoh --dtop --replay --replay-db=go2_bigoffice run unitree-go2

Notes

  • this PR is intended as the wrapped successor to Feat/integrate zenoh #1787, not a separate redesign
  • Linux behavior remains unchanged by default: explicit Zenoh still works, and the default transport remains LCM

@bogwi bogwi changed the title Wrap Zenoh integration for macOS Big Office replay Default to Zenoh transport on macOS and document replay workflow Apr 23, 2026
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 23, 2026

Greptile Summary

This PR adds Zenoh as an alternative stream transport to LCM, defaulting to it on macOS when eclipse-zenoh is installed, and documents the replay workflow for the Big Office dataset. The implementation adds ZenohTransport/pZenohTransport wrappers, a shared session-singleton ZenohService, and transport-aware pubsub resolution in the Rerun bridge.

  • New zenoh optional extra (eclipse-zenoh>=1.0.0,<2.0) in pyproject.toml, guarded throughout by ZENOH_AVAILABLE so non-Zenoh environments are unaffected.
  • GlobalConfig.transport field drives the selection between \"lcm\" and \"zenoh\" globally; _default_transport() encodes the platform heuristic (macOS + Zenoh installed → zenoh).
  • Bridge pubsub resolution (_resolve_pubsubs) treats the legacy [LCM()] default as backward-compatible and adds both Zenoh() and LCM() when transport is Zenoh (LCM is still needed for TF frames).

Confidence Score: 5/5

Safe to merge — new Zenoh transport is fully opt-in and guarded by ZENOH_AVAILABLE; all existing LCM paths are unmodified.

All previously flagged issues (importorskip guards, _sessions race in test cleanup, bridge pubsubs field ignored) have been addressed in the intervening commits. The new transport branching, session-singleton management, and bridge pubsub resolution are correct and covered by tests. Linux defaults remain unchanged. The Zenoh implementation is conditionally compiled and cannot regress non-Zenoh environments.

No files require special attention.

Important Files Changed

Filename Overview
dimos/core/global_config.py Adds transport: TransportBackend field with platform-aware default factory; validate_assignment=True added so update() calls validate the new field correctly.
dimos/core/transport.py Adds ZENOH_AVAILABLE sentinel and full ZenohTransport/pZenohTransport implementations inside a if ZENOH_AVAILABLE: guard; existing LCM/DDS paths are unmodified.
dimos/protocol/service/zenohservice.py New singleton Zenoh session manager; unconditional import zenoh at module level is safe because this file is only imported inside ZENOH_AVAILABLE guards elsewhere.
dimos/protocol/pubsub/impl/zenohpubsub.py New Zenoh pubsub implementation with LCM and pickle encoder variants; key-expression encoding/decoding for typed messages is well-tested and correctly documented with its known limitation.
dimos/visualization/rerun/bridge.py Adds _default_pubsubs/_resolve_pubsubs for transport-driven pubsub selection; correctly adds both Zenoh and LCM when transport=zenoh to preserve TF frame updates. Entity path stripping logic for Zenoh key expressions looks correct.
dimos/core/coordination/module_coordinator.py Transport branching in _get_transport_for correctly prefixes Zenoh topics with ZENOH_DIMOS_KEY_PREFIX; LCM configurators still run unconditionally as documented.
dimos/core/test_zenoh_transport.py New test suite; pytest.importorskip("zenoh") guards the entire module, allowing platform-agnostic suites to be skipped cleanly when Zenoh is absent rather than failing collection.
dimos/core/test_utils.py New import-safe shared test helper (retry_until) extracted to avoid pulling in zenoh at collection time in non-Zenoh environments.
docs/usage/transports/index.md Documents Zenoh quickstart, platform defaults table, and CLI vs env-var override paths; new content is accurate for its described scope.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[CLI / GlobalConfig init] --> B{platform.system == Darwin
and ZENOH_AVAILABLE?}
    B -- Yes --> C[transport = 'zenoh']
    B -- No --> D[transport = 'lcm']
    C --> E[ModuleCoordinator._get_transport_for]
    D --> E
    E --> F{global_config.transport}
    F -- lcm --> G[LCMTransport or pLCMTransport
topic: '/name']
    F -- zenoh --> H{ZENOH_AVAILABLE?}
    H -- No --> I[RuntimeError: install hint]
    H -- Yes --> J[ZenohTransport or pZenohTransport
topic: 'dimos/name']
    G --> K[Module streams live on LCM multicast]
    J --> L[Module streams live on Zenoh session]
    L --> M[RerunBridgeModule._resolve_pubsubs]
    K --> M
    M --> N{pubsubs explicitly overridden
with non-LCM backend?}
    N -- Yes --> O[Return custom pubsubs]
    N -- No --> P{transport == 'zenoh'?}
    P -- Yes --> Q[Return Zenoh + LCM
# LCM needed for TF frames]
    P -- No --> R[Return LCM only]
Loading

Reviews (13): Last reviewed commit: "add pytest.importorskip("zenoh") guard" | Re-trigger Greptile

Comment thread dimos/core/test_zenoh_transport.py
Comment thread dimos/visualization/rerun/bridge.py Outdated
Comment thread dimos/protocol/service/test_zenohservice.py Outdated
@bogwi bogwi force-pushed the feat/integrate-zenoh branch 2 times, most recently from 4141991 to fd20ca8 Compare May 4, 2026 13:52
@bogwi
Copy link
Copy Markdown
Collaborator Author

bogwi commented May 4, 2026

Greptile Summary

  • P1 — wrong env-var name in docs: docs/usage/transports/index.md (and osx.md) document DIMOS_TRANSPORT=zenoh as the env-var override, but GlobalConfig uses pydantic_settings.BaseSettings with no env_prefix, so the actual variable pydantic-settings reads is TRANSPORT. Setting DIMOS_TRANSPORT will be silently ignored.

Confidence Score: 4/5

Safe to merge with the env-var name corrected in docs; runtime behavior is unaffected by the docs bug.

One P1 finding: the documented env-var DIMOS_TRANSPORT will silently have no effect because GlobalConfig has no env_prefix. The code itself is correct and well-tested; the bug is isolated to documentation. P1 ceiling is 4/5.

docs/usage/transports/index.md and docs/installation/osx.md — both reference the incorrect DIMOS_TRANSPORT env-var name.

This is not the purpose of this branch to add these changes to global_config.py:

model_config = SettingsConfigDict(
        env_prefix="DIMOS_",
        ...

It will be breaking cause anyone relying on unprefixed env vars for GlobalConfig will no longer get that effect; they need DIMOS_ for those settings to load from the environment / .env even though the cli.md in line 53 says, - "Environment variables and .env values must be prefixed with DIMOS_" . That is not yet done, but the docs for this PR have to be aligned so to not tackle them later in yet a new PR.

@bogwi
Copy link
Copy Markdown
Collaborator Author

bogwi commented May 4, 2026

  1. The branch defaults to zenoh on Mac yet needs a flag on the rest.

From dimos/docs/usage/transports/index.md

Typical replay on macOS when Zenoh is installed (default is already Zenoh, so no transport flag is required):

dimos --dtop --replay --replay-db=go2_bigoffice run unitree-go2

The same workload on Linux (default remains lcm until you opt in):

dimos --transport=zenoh --dtop --replay --replay-db=go2_bigoffice run unitree-go2
  1. All issues mentioned in the opening PR Feat/integrate zenoh #1787 and recent Finish Zenoh integration Finish Zenoh integration #1941 were addressed.

Comment thread docs/usage/transports/index.md
@bogwi bogwi force-pushed the feat/integrate-zenoh branch from 593bb38 to 791b722 Compare May 5, 2026 00:33
@leshy leshy added the PlzReview label May 5, 2026
from dimos.protocol.service.system_configurator.lcm_config import lcm_configurators

configurators = [*lcm_configurators(), *blueprint.configurator_checks]
lcm_checks = lcm_configurators() if global_config.transport == "lcm" else []
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the global transport is zenoh, we still need to run LCM configurators so that LCM works for individual LCM transports. LCM configurators can only be skipped if we remove LCM entirely, no?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a small bool helper:

def _blueprint_specifies_lcm_transport(blueprint: Blueprint) -> bool:
    """True when the merged blueprint wires at least one stream over LCM explicitly."""
    for transport in blueprint.transport_map.values():
        if isinstance(transport, (LCMTransport, pLCMTransport)):
            return True
    return False

then wire it:

def _run_configurators(blueprint: Blueprint) -> None:
        ...
        
    need_lcm_system_config = global_config.transport == "lcm" or _blueprint_specifies_lcm_transport(
            blueprint
        )
    lcm_checks = lcm_configurators() if need_lcm_system_config else []
    configurators = [*lcm_checks, *blueprint.configurator_checks]
    
        ...
        

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's sufficient, because we sometimes use LCM outside of module transports. For example see pLCMTransport("/human_input") in utils/cli/human/humancli.py.

My assumption is that we need to always configure LCM until we remove it (if we do that).

Copy link
Copy Markdown
Collaborator Author

@bogwi bogwi May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's make lcm_checks = lcm_configurators(). (No bool helper proposed is needed)

def _run_configurators(blueprint: Blueprint) -> None:
    from dimos.protocol.service.system_configurator.base import configure_system
    from dimos.protocol.service.system_configurator.lcm_config import lcm_configurators

    lcm_checks = lcm_configurators()
    configurators = [*lcm_checks, *blueprint.configurator_checks]

    try:
        configure_system(configurators)
    except SystemExit:
        labels = [type(c).__name__ for c in configurators]
        print(
            f"Required system configuration was declined: {', '.join(labels)}",
            file=sys.stderr,
        )
        sys.exit(1)

?

Comment thread dimos/protocol/pubsub/impl/zenohpubsub.py Outdated
Comment thread docs/development/testing.md Outdated
Comment thread dimos/visualization/rerun/test_viewer_integration.py Outdated
Comment thread dimos/core/test_zenoh_transport.py Outdated
Comment thread dimos/core/test_zenoh_transport.py Outdated
Comment thread docs/usage/cli.md
Comment thread dimos/core/coordination/module_coordinator.py Outdated
Comment thread dimos/protocol/service/zenohservice.py
@leshy leshy mentioned this pull request May 7, 2026
1 task
@bogwi bogwi force-pushed the feat/integrate-zenoh branch 3 times, most recently from 962aca2 to c338ce7 Compare May 8, 2026 15:09
Comment thread dimos/core/test_zenoh_transport.py
@codecov
Copy link
Copy Markdown

codecov Bot commented May 8, 2026

vrinek added 4 commits May 9, 2026 01:30
Prepare the codebase for Zenoh integration without changing behavior.
All existing tests pass (1401 passed, 3 xfailed for Phase 2 stubs).

- Add `transport` field to GlobalConfig (default: "lcm")
- Add ZENOH_AVAILABLE guard in transport.py
- Branch _get_transport_for() on global_config.transport
- Gate LCM configurators to only run when transport is "lcm"
- Add ZenohTransport/pZenohTransport behind ZENOH_AVAILABLE guard
- Add zenohpubsub.py stub (raises NotImplementedError)
- Add `zenoh` optional dependency group in pyproject.toml
- Add test_zenoh_transport.py covering all new conditional branches
TDD: tests written first, then implementation.
Follows DDSService pattern — module-level session dict with lock.

- ZenohConfig with mode/connect/listen fields and session_key
- ZenohService.start() opens session if not exists for config
- ZenohService.stop() does NOT close shared session
- session property raises RuntimeError if not started
- Two services with same config share one session (8 tests pass)
TDD: tests written first, then implementation.

- ZenohPubSubBase(ZenohService, AllPubSub[Topic, bytes])
- Publisher caching per key expression (avoids re-declaring)
- Subscriber tracking for cleanup on stop()
- Idempotent unsubscribe (guards against Zenoh ZError)
- subscribe_all() via dimos/** wildcard
- Zenoh and PickleZenoh composed classes (encoder mixins)
- 7 unit tests pass
Both encoder-composed variants pass all spec conformance tests:
- test_store, test_multiple_subscribers, test_unsubscribe
- test_multiple_messages, test_async_iterator
- 25 total tests pass (10 new Zenoh tests)
vrinek and others added 29 commits May 9, 2026 01:30
- Fix dimensionalOS#3: unsubscribe() now only calls undeclare() if it successfully
  removed the subscriber from the list. If stop() already cleared the
  list, unsubscribe() returns without double-undeclaring.
- Fix dimensionalOS#5: on_sample callback wraps payload.to_bytes() in try/except
  to prevent malformed payloads from crashing Zenoh's internal thread.
Check membership before removing instead of catching ValueError.
Reads more clearly and avoids using exceptions for control flow.
Two issues prevented the Rerun bridge from showing data over Zenoh:

1. The bridge hardcoded LCM() as its pubsub. Now resolves lazily at
   start() using self.config.g.transport from the worker's GlobalConfig.

2. Zenoh key expressions cannot contain '#' (forbidden character).
   Type info is now embedded as a '/' segment in the key expression
   (e.g., dimos/pointcloud/sensor_msgs.PointCloud2). _key_expr_to_topic
   reconstructs the Topic with lcm_type for subscribe_all decoding.

Also fixes entity path mapping to strip the dimos/ prefix so Zenoh
entity paths match LCM paths in the Rerun viewer.
- typed_out/untyped_out → typed_data/untyped_data
- Use TypedMsg instead of Image for blueprint integration tests
- Image still used in transport wrapper test (real LCM round-trip)
Replace raw time.sleep() calls with named helpers that document intent.
wait_for_subscribers() explains Zenoh has no "subscriber ready" signal.
Replace manual if-both-received check with threading.Barrier(2).
The previous approach could miss the event if both callbacks ran
concurrently and checked the other's list before it was populated.
Review findings dimensionalOS#2 and dimensionalOS#4:
- Remove Config.pubsubs from RerunBridgeModule — pubsubs are resolved
  lazily at start() from global_config.transport
- Remove _zenoh_topic field from pZenohTransport — construct on demand
  like pLCMTransport does, avoiding dual state
8 new tests covering:
- Typed/untyped topic → key expression conversion
- Key expression → topic with known/unknown/missing type
- Default lcm_type fallback
- Round-trip typed and untyped

Also documents known limitation: if a topic's base path ends with a
segment matching a registered DimosMsg type name, _key_expr_to_topic
will incorrectly split it. In practice this doesn't happen because
stream names (cmd_vel, lidar) don't match type names.
Existing blueprints pass pubsubs=[LCM()] to RerunBridgeModule.
Removing the field caused a Pydantic ValidationError (extra_forbidden).
Keep the field but document that it's ignored — start() resolves
the pubsub backend from global_config.transport instead.
TF (transform frames) is hardcoded to LCM in the Module base class.
When transport=zenoh, module streams use Zenoh but TF stays on LCM.
The bridge now listens on both so the robot pose updates in the viewer.
Zenoh tests used time.sleep() to wait for subscriber propagation,
which is either too slow or too flaky in CI. Replace with _retry_until()
that re-publishes in a tight loop until the subscriber's Event fires.
Calls zenoh.init_log_from_env_or("warn") at module load so that
RUST_LOG=debug surfaces Zenoh's Rust-side transport logs (including
SHM negotiation). Defaults to warn to avoid noise.
uv sync --extra zenoh would resolve dimos[dev] from PyPI instead of
the local project, uninstalling other dependencies. The zenoh extra
only needs eclipse-zenoh — base deps are already installed.
… missing

Align with module_coordinator._get_transport_for: raise RuntimeError instead
of silently falling back to LCM when transport is zenoh and eclipse-zenoh is
not installed.
Agent and MCP conftest subscribe on LCM while the coordinator uses
global_config.transport, which defaults to Zenoh on Darwin when Zenoh is
installed. Set transport to lcm so spies and modules share the same backend.

The module reloading test runs a separate Python process; set transport to
lcm in the REPL before ModuleCoordinator.build for the same reason.
@bogwi bogwi force-pushed the feat/integrate-zenoh branch from c338ce7 to 1ab1916 Compare May 8, 2026 16:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants