Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 82 additions & 5 deletions dimos/agents/skills/navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import math
import time
from typing import Any

Expand All @@ -37,6 +38,14 @@
logger = setup_logger()


def _make_position_goal(x: float, y: float, yaw_rad: float, frame_id: str) -> PoseStamped:
return PoseStamped(
position=make_vector3(x, y, 0.0),
orientation=Quaternion.from_euler(make_vector3(0.0, 0.0, yaw_rad)),
frame_id=frame_id,
)


class NavigationSkillContainer(Module):
_latest_image: Image | None = None
_latest_odom: PoseStamped | None = None
Expand Down Expand Up @@ -243,6 +252,78 @@ def _navigate_using_semantic_map(self, query: str) -> str:
message = f"Found a location in the semantic map matching '{query}'."
return self._navigate_to(goal_pose, message)

@skill
def navigate_to_position(
self,
x: float,
y: float,
yaw_deg: float = 0.0,
frame_id: str = "map",
) -> str:
"""Navigate to an absolute position in the given frame.

Use this to act on a pose returned by another tool — pass its x, y, yaw_deg, and frame_id straight in.

Args:
x: target x in meters
y: target y in meters
yaw_deg: final heading in degrees, 0 = facing +x
frame_id: coordinate frame
"""
if not self._skill_started:
raise ValueError(f"{self} has not been started.")

goal = _make_position_goal(x, y, math.radians(yaw_deg), frame_id)
return self._navigate_to(goal, f"Heading to ({x:.2f}, {y:.2f}) in {frame_id}")

@skill
def rotate_toward_position(
self,
x: float,
y: float,
frame_id: str = "map",
) -> str:
"""Rotate in place to face an absolute position in the given frame.

Pass the target x and y from a pose's fields. Yaw is computed against the robot's current odometry.

Args:
x: target x in meters
y: target y in meters
frame_id: coordinate frame
"""
if not self._skill_started:
raise ValueError(f"{self} has not been started.")

if self._latest_odom is None:
return "No odometry data received yet, cannot rotate."

cur_x = self._latest_odom.position.x
cur_y = self._latest_odom.position.y
yaw_rad = math.atan2(y - cur_y, x - cur_x)

goal = _make_position_goal(cur_x, cur_y, yaw_rad, frame_id)
return self._navigate_to(goal, f"Rotating to face ({x:.2f}, {y:.2f}) in {frame_id}")
Comment on lines +298 to +306
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Frame mismatch silently corrupts yaw and goal position

yaw_rad = math.atan2(y - cur_y, x - cur_x) subtracts self._latest_odom.position (which may be in the "odom" frame) from (x, y) (which the caller asserts is in frame_id, defaulting to "map"). When the two frames differ — common in ROS setups where the /odom topic publishes in the "odom" frame while navigation uses "map" — the difference vector is geometrically meaningless, producing a wrong heading. Additionally, _make_position_goal(cur_x, cur_y, yaw_rad, frame_id) stamps odom-frame coordinates with frame_id, so the navigation stack receives a pose with a mismatched header frame. There is no guard or warning when self._latest_odom.frame_id != frame_id.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In dimos, map↔world is published as an identity TF (dimos/navigation/rosnav.py:187-194), and existing skills follow the same implicit-frame convention — tag_location reads odom (frame "world") and stamps the result "map" (navigation.py:162-166), and _get_goal_pose_from_result hardcodes "map" (navigation.py:357). A frame_id string-equality check would actively break the default Go2 flow (odom publishes frame_id="world", default skill arg is "map") without catching any real geometric mismatch. Proper handling needs a TF-tree lookup applied uniformly across all nav skills — separate PR.


@skill
def current_pose(self) -> str:
"""Return the robot's current pose.

Use this to reason about a target pose relative to the robot (e.g. distance or bearing). Compare only against poses with the same frame_id.
"""
if not self._skill_started:
raise ValueError(f"{self} has not been started.")

if self._latest_odom is None:
return "No odometry data received yet."

data = self._latest_odom.agent_encode()
return (
f"Pose: ({data['x']}, {data['y']}, {data['z']})\n"
f"Yaw: {data['yaw_deg']}°\n"
f"Frame: {data['frame_id']}"
)

@skill
def stop_navigation(self) -> str:
"""Immediatly stop moving."""
Expand Down Expand Up @@ -273,8 +354,4 @@ def _get_goal_pose_from_result(self, result: dict[str, Any]) -> PoseStamped | No
pos_y = first.get("pos_y", 0)
theta = first.get("rot_z", 0)

return PoseStamped(
position=make_vector3(pos_x, pos_y, 0),
orientation=Quaternion.from_euler(make_vector3(0, 0, theta)),
frame_id="map",
)
return _make_position_goal(pos_x, pos_y, theta, "map")
78 changes: 78 additions & 0 deletions dimos/agents/skills/test_navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import math
from typing import Any

from langchain_core.messages import HumanMessage
Expand Down Expand Up @@ -118,6 +119,21 @@ def _navigate_using_semantic_map(self, query):
return f"Successfuly arrived at '{query}'"


class MockedPositionNavSkill(NavigationSkillContainer):
"""Direct-instantiation harness for navigate_to_position /
rotate_toward_position / current_pose tests. Skips the heavy parent
__init__ and records goals at the _navigate_to boundary."""

def __init__(self, latest_odom: PoseStamped | None = None) -> None:
self._skill_started = True
self._latest_odom = latest_odom
self.recorded_goals: list[PoseStamped] = []

def _navigate_to(self, pose: PoseStamped, message: str) -> str:
self.recorded_goals.append(pose)
return message


@pytest.mark.slow
def test_stop_movement(agent_setup) -> None:
history = agent_setup(
Expand Down Expand Up @@ -163,3 +179,65 @@ def test_go_to_semantic_location(agent_setup) -> None:
)

assert "success" in history[-1].content.lower()


def test_navigate_to_position_sets_goal_with_yaw() -> None:
skill = MockedPositionNavSkill()
skill.navigate_to_position(x=3.0, y=4.0, yaw_deg=90.0, frame_id="map")

assert len(skill.recorded_goals) == 1
goal = skill.recorded_goals[0]
assert goal.frame_id == "map"
assert goal.position.x == pytest.approx(3.0)
assert goal.position.y == pytest.approx(4.0)
assert math.degrees(goal.yaw) == pytest.approx(90.0, abs=0.1)


def test_rotate_toward_position_computes_yaw_from_odom() -> None:
skill = MockedPositionNavSkill(
latest_odom=PoseStamped(
position=(0.0, 0.0, 0.0),
orientation=(0.0, 0.0, 0.0, 1.0),
frame_id="map",
)
)
skill.rotate_toward_position(x=0.0, y=1.0, frame_id="map")

assert len(skill.recorded_goals) == 1
goal = skill.recorded_goals[0]
assert goal.position.x == pytest.approx(0.0)
assert goal.position.y == pytest.approx(0.0)
assert math.degrees(goal.yaw) == pytest.approx(90.0, abs=0.1)


def test_rotate_toward_position_without_odom_returns_message() -> None:
skill = MockedPositionNavSkill()

result = skill.rotate_toward_position(x=1.0, y=0.0, frame_id="map")

assert "no odometry" in result.lower()
assert skill.recorded_goals == []


def test_current_pose_returns_formatted_pose() -> None:
skill = MockedPositionNavSkill(
latest_odom=PoseStamped(
position=(1.0, 2.0, 0.0),
orientation=(0.0, 0.0, 0.0, 1.0),
frame_id="map",
)
)

result = skill.current_pose()

assert "Pose: (1.0, 2.0, 0.0)" in result
assert "Yaw: 0.0" in result
assert "Frame: map" in result


def test_current_pose_without_odom_returns_message() -> None:
skill = MockedPositionNavSkill()

result = skill.current_pose()

assert "no odometry" in result.lower()
19 changes: 18 additions & 1 deletion dimos/msgs/geometry_msgs/PoseStamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import math
import time
from typing import TYPE_CHECKING, BinaryIO, TypeAlias
from typing import TYPE_CHECKING, Any, BinaryIO, TypeAlias

if TYPE_CHECKING:
from rerun._baseclasses import Archetype
Expand Down Expand Up @@ -82,6 +82,23 @@ def __str__(self) -> str:
f"euler=[{math.degrees(self.roll):.1f}, {math.degrees(self.pitch):.1f}, {math.degrees(self.yaw):.1f}])"
)

def agent_encode(self) -> dict[str, Any]:
"""Render the pose for an LLM agent.

Returns a flat dict with ``frame_id``, ``x``, ``y``, ``z``,
``yaw_deg``. Pass these straight into navigation skills (e.g.
``navigate_to_position``) to act on the pose. To reason about
the pose relative to the robot, the agent should fetch its own
pose (e.g. via ``current_pose``) and combine the two.
"""
return {
"frame_id": self.frame_id,
"x": round(self.x, 3),
"y": round(self.y, 3),
"z": round(self.z, 3),
"yaw_deg": round(math.degrees(self.yaw), 1),
}

def to_rerun(self) -> Archetype:
"""Convert to rerun Transform3D format.

Expand Down
39 changes: 39 additions & 0 deletions dimos/msgs/geometry_msgs/test_PoseStamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import math
import pickle
import time

import pytest

from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Vector3 import Vector3


def test_lcm_encode_decode() -> None:
Expand Down Expand Up @@ -53,3 +58,37 @@ def test_pickle_encode_decode() -> None:
assert isinstance(pose_dest, PoseStamped)
assert pose_dest is not pose_source
assert pose_dest == pose_source


def test_agent_encode_returns_absolute_fields() -> None:
"""Test that agent_encode returns frame_id, x, y, z, and yaw_deg."""

pose = PoseStamped(
position=Vector3(1.0, 2.0, 3.0),
orientation=Quaternion.from_euler(Vector3(0.0, 0.0, math.radians(45.0))),
frame_id="map",
)
encoded = pose.agent_encode()

assert set(encoded.keys()) == {"frame_id", "x", "y", "z", "yaw_deg"}
assert encoded["frame_id"] == "map"
assert encoded["x"] == pytest.approx(1.0)
assert encoded["y"] == pytest.approx(2.0)
assert encoded["z"] == pytest.approx(3.0)
assert encoded["yaw_deg"] == pytest.approx(45.0, abs=0.1)


def test_agent_encode_rounds_values() -> None:
"""Test that agent_encode rounds position to 3 dp and yaw to 1 dp."""

pose = PoseStamped(
position=Vector3(1.23456, 2.34567, 3.45678),
orientation=Quaternion.from_euler(Vector3(0.0, 0.0, math.radians(45.123))),
frame_id="map",
)
encoded = pose.agent_encode()

assert encoded["x"] == pytest.approx(1.235)
assert encoded["y"] == pytest.approx(2.346)
assert encoded["z"] == pytest.approx(3.457)
assert encoded["yaw_deg"] == pytest.approx(45.1, abs=0.05)