From d12e692db58a0628d958cabf5bfea7945bd28299 Mon Sep 17 00:00:00 2001 From: douglasswng Date: Thu, 7 May 2026 11:16:50 +0800 Subject: [PATCH] feat(agents): agent-encode PoseStamped + position nav skills --- dimos/agents/skills/navigation.py | 87 ++++++++++++++++++-- dimos/agents/skills/test_navigation.py | 78 ++++++++++++++++++ dimos/msgs/geometry_msgs/PoseStamped.py | 19 ++++- dimos/msgs/geometry_msgs/test_PoseStamped.py | 39 +++++++++ 4 files changed, 217 insertions(+), 6 deletions(-) diff --git a/dimos/agents/skills/navigation.py b/dimos/agents/skills/navigation.py index d88bec452e..a7202fb94c 100644 --- a/dimos/agents/skills/navigation.py +++ b/dimos/agents/skills/navigation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math import time from typing import Any @@ -37,6 +38,14 @@ logger = setup_logger() +def _make_position_goal(x: float, y: float, yaw_rad: float, frame_id: str) -> PoseStamped: + return PoseStamped( + position=make_vector3(x, y, 0.0), + orientation=Quaternion.from_euler(make_vector3(0.0, 0.0, yaw_rad)), + frame_id=frame_id, + ) + + class NavigationSkillContainer(Module): _latest_image: Image | None = None _latest_odom: PoseStamped | None = None @@ -243,6 +252,78 @@ def _navigate_using_semantic_map(self, query: str) -> str: message = f"Found a location in the semantic map matching '{query}'." return self._navigate_to(goal_pose, message) + @skill + def navigate_to_position( + self, + x: float, + y: float, + yaw_deg: float = 0.0, + frame_id: str = "map", + ) -> str: + """Navigate to an absolute position in the given frame. + + Use this to act on a pose returned by another tool — pass its x, y, yaw_deg, and frame_id straight in. + + Args: + x: target x in meters + y: target y in meters + yaw_deg: final heading in degrees, 0 = facing +x + frame_id: coordinate frame + """ + if not self._skill_started: + raise ValueError(f"{self} has not been started.") + + goal = _make_position_goal(x, y, math.radians(yaw_deg), frame_id) + return self._navigate_to(goal, f"Heading to ({x:.2f}, {y:.2f}) in {frame_id}") + + @skill + def rotate_toward_position( + self, + x: float, + y: float, + frame_id: str = "map", + ) -> str: + """Rotate in place to face an absolute position in the given frame. + + Pass the target x and y from a pose's fields. Yaw is computed against the robot's current odometry. + + Args: + x: target x in meters + y: target y in meters + frame_id: coordinate frame + """ + if not self._skill_started: + raise ValueError(f"{self} has not been started.") + + if self._latest_odom is None: + return "No odometry data received yet, cannot rotate." + + cur_x = self._latest_odom.position.x + cur_y = self._latest_odom.position.y + yaw_rad = math.atan2(y - cur_y, x - cur_x) + + goal = _make_position_goal(cur_x, cur_y, yaw_rad, frame_id) + return self._navigate_to(goal, f"Rotating to face ({x:.2f}, {y:.2f}) in {frame_id}") + + @skill + def current_pose(self) -> str: + """Return the robot's current pose. + + Use this to reason about a target pose relative to the robot (e.g. distance or bearing). Compare only against poses with the same frame_id. + """ + if not self._skill_started: + raise ValueError(f"{self} has not been started.") + + if self._latest_odom is None: + return "No odometry data received yet." + + data = self._latest_odom.agent_encode() + return ( + f"Pose: ({data['x']}, {data['y']}, {data['z']})\n" + f"Yaw: {data['yaw_deg']}°\n" + f"Frame: {data['frame_id']}" + ) + @skill def stop_navigation(self) -> str: """Immediatly stop moving.""" @@ -273,8 +354,4 @@ def _get_goal_pose_from_result(self, result: dict[str, Any]) -> PoseStamped | No pos_y = first.get("pos_y", 0) theta = first.get("rot_z", 0) - return PoseStamped( - position=make_vector3(pos_x, pos_y, 0), - orientation=Quaternion.from_euler(make_vector3(0, 0, theta)), - frame_id="map", - ) + return _make_position_goal(pos_x, pos_y, theta, "map") diff --git a/dimos/agents/skills/test_navigation.py b/dimos/agents/skills/test_navigation.py index c837492d50..395d4a10e5 100644 --- a/dimos/agents/skills/test_navigation.py +++ b/dimos/agents/skills/test_navigation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math from typing import Any from langchain_core.messages import HumanMessage @@ -118,6 +119,21 @@ def _navigate_using_semantic_map(self, query): return f"Successfuly arrived at '{query}'" +class MockedPositionNavSkill(NavigationSkillContainer): + """Direct-instantiation harness for navigate_to_position / + rotate_toward_position / current_pose tests. Skips the heavy parent + __init__ and records goals at the _navigate_to boundary.""" + + def __init__(self, latest_odom: PoseStamped | None = None) -> None: + self._skill_started = True + self._latest_odom = latest_odom + self.recorded_goals: list[PoseStamped] = [] + + def _navigate_to(self, pose: PoseStamped, message: str) -> str: + self.recorded_goals.append(pose) + return message + + @pytest.mark.slow def test_stop_movement(agent_setup) -> None: history = agent_setup( @@ -163,3 +179,65 @@ def test_go_to_semantic_location(agent_setup) -> None: ) assert "success" in history[-1].content.lower() + + +def test_navigate_to_position_sets_goal_with_yaw() -> None: + skill = MockedPositionNavSkill() + skill.navigate_to_position(x=3.0, y=4.0, yaw_deg=90.0, frame_id="map") + + assert len(skill.recorded_goals) == 1 + goal = skill.recorded_goals[0] + assert goal.frame_id == "map" + assert goal.position.x == pytest.approx(3.0) + assert goal.position.y == pytest.approx(4.0) + assert math.degrees(goal.yaw) == pytest.approx(90.0, abs=0.1) + + +def test_rotate_toward_position_computes_yaw_from_odom() -> None: + skill = MockedPositionNavSkill( + latest_odom=PoseStamped( + position=(0.0, 0.0, 0.0), + orientation=(0.0, 0.0, 0.0, 1.0), + frame_id="map", + ) + ) + skill.rotate_toward_position(x=0.0, y=1.0, frame_id="map") + + assert len(skill.recorded_goals) == 1 + goal = skill.recorded_goals[0] + assert goal.position.x == pytest.approx(0.0) + assert goal.position.y == pytest.approx(0.0) + assert math.degrees(goal.yaw) == pytest.approx(90.0, abs=0.1) + + +def test_rotate_toward_position_without_odom_returns_message() -> None: + skill = MockedPositionNavSkill() + + result = skill.rotate_toward_position(x=1.0, y=0.0, frame_id="map") + + assert "no odometry" in result.lower() + assert skill.recorded_goals == [] + + +def test_current_pose_returns_formatted_pose() -> None: + skill = MockedPositionNavSkill( + latest_odom=PoseStamped( + position=(1.0, 2.0, 0.0), + orientation=(0.0, 0.0, 0.0, 1.0), + frame_id="map", + ) + ) + + result = skill.current_pose() + + assert "Pose: (1.0, 2.0, 0.0)" in result + assert "Yaw: 0.0" in result + assert "Frame: map" in result + + +def test_current_pose_without_odom_returns_message() -> None: + skill = MockedPositionNavSkill() + + result = skill.current_pose() + + assert "no odometry" in result.lower() diff --git a/dimos/msgs/geometry_msgs/PoseStamped.py b/dimos/msgs/geometry_msgs/PoseStamped.py index acf0af8b32..722e4dce3d 100644 --- a/dimos/msgs/geometry_msgs/PoseStamped.py +++ b/dimos/msgs/geometry_msgs/PoseStamped.py @@ -16,7 +16,7 @@ import math import time -from typing import TYPE_CHECKING, BinaryIO, TypeAlias +from typing import TYPE_CHECKING, Any, BinaryIO, TypeAlias if TYPE_CHECKING: from rerun._baseclasses import Archetype @@ -82,6 +82,23 @@ def __str__(self) -> str: f"euler=[{math.degrees(self.roll):.1f}, {math.degrees(self.pitch):.1f}, {math.degrees(self.yaw):.1f}])" ) + def agent_encode(self) -> dict[str, Any]: + """Render the pose for an LLM agent. + + Returns a flat dict with ``frame_id``, ``x``, ``y``, ``z``, + ``yaw_deg``. Pass these straight into navigation skills (e.g. + ``navigate_to_position``) to act on the pose. To reason about + the pose relative to the robot, the agent should fetch its own + pose (e.g. via ``current_pose``) and combine the two. + """ + return { + "frame_id": self.frame_id, + "x": round(self.x, 3), + "y": round(self.y, 3), + "z": round(self.z, 3), + "yaw_deg": round(math.degrees(self.yaw), 1), + } + def to_rerun(self) -> Archetype: """Convert to rerun Transform3D format. diff --git a/dimos/msgs/geometry_msgs/test_PoseStamped.py b/dimos/msgs/geometry_msgs/test_PoseStamped.py index a486f33303..8a4d8ba18d 100644 --- a/dimos/msgs/geometry_msgs/test_PoseStamped.py +++ b/dimos/msgs/geometry_msgs/test_PoseStamped.py @@ -12,10 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math import pickle import time +import pytest + from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 def test_lcm_encode_decode() -> None: @@ -53,3 +58,37 @@ def test_pickle_encode_decode() -> None: assert isinstance(pose_dest, PoseStamped) assert pose_dest is not pose_source assert pose_dest == pose_source + + +def test_agent_encode_returns_absolute_fields() -> None: + """Test that agent_encode returns frame_id, x, y, z, and yaw_deg.""" + + pose = PoseStamped( + position=Vector3(1.0, 2.0, 3.0), + orientation=Quaternion.from_euler(Vector3(0.0, 0.0, math.radians(45.0))), + frame_id="map", + ) + encoded = pose.agent_encode() + + assert set(encoded.keys()) == {"frame_id", "x", "y", "z", "yaw_deg"} + assert encoded["frame_id"] == "map" + assert encoded["x"] == pytest.approx(1.0) + assert encoded["y"] == pytest.approx(2.0) + assert encoded["z"] == pytest.approx(3.0) + assert encoded["yaw_deg"] == pytest.approx(45.0, abs=0.1) + + +def test_agent_encode_rounds_values() -> None: + """Test that agent_encode rounds position to 3 dp and yaw to 1 dp.""" + + pose = PoseStamped( + position=Vector3(1.23456, 2.34567, 3.45678), + orientation=Quaternion.from_euler(Vector3(0.0, 0.0, math.radians(45.123))), + frame_id="map", + ) + encoded = pose.agent_encode() + + assert encoded["x"] == pytest.approx(1.235) + assert encoded["y"] == pytest.approx(2.346) + assert encoded["z"] == pytest.approx(3.457) + assert encoded["yaw_deg"] == pytest.approx(45.1, abs=0.05)