diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 1a5366e..dc77883 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- python-version: ["3.11", "3.12", "3.13"]
+ python-version: ["3.11", "3.12", "3.13", "3.14"]
env:
TERM: xterm-256color
FORCE_COLOR: 1
@@ -48,10 +48,10 @@ jobs:
FORCE_COLOR: 1
steps:
- uses: actions/checkout@v3
- - name: Set up Python 3.13
+ - name: Set up Python 3.14
uses: actions/setup-python@v4
with:
- python-version: 3.13
+ python-version: 3.14
- name: Install dependencies
run: |
python -m pip install --upgrade pip
diff --git a/docs/_static/plot_scalability.svg b/docs/_static/plot_scalability.svg
new file mode 100644
index 0000000..fd36e77
--- /dev/null
+++ b/docs/_static/plot_scalability.svg
@@ -0,0 +1,1848 @@
+
+
+
diff --git a/docs/changelog.rst b/docs/changelog.rst
index e829f7e..836d5c4 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -1,45 +1,54 @@
Changelog
=========
-Pre-Release v1.0.4a0 (2026-04-29)
+Pre-Release v1.0.4a1 (2026-05-05)
---------------------------------
General
^^^^^^^
1. The ``_step_callback`` of the ``BasicMagBotSingleAgentEnv`` now returns the possibly modified action instead of None.
+2. Added support for Python 3.14
+
+Performance
+^^^^^^^^^^^
+1. Added spatial hashing for mover-mover collision detection.
+2. Precompute per-tile wall-segment lookup table for wall collision checks.
+3. Added fast paths for uniform mover sizes and complete tile layouts in ``qpos_is_valid``.
+4. Cached episode-level Shapely polygons in ``StateBasedGlobalPushingEnv`` to avoid rebuilding them on every step.
+5. Removed ``scipy`` dependency and optimize rectangle vertex calculation in ``geometry_2D_utils``.
Bug Fixes
^^^^^^^^^
-1. Fix wrong mover bumper mass and corresponding examples in the documentation
-2. Fix wrong gainprm in all RL environments
-3. Ensure correct maximum dynamics of the actions (RL environments). This is fixed by using the ``_step_callback``.
+1. Fixed wrong mover bumper mass and corresponding examples in the documentation
+2. Fixed wrong gainprm in all RL environments
+3. Ensured correct maximum dynamics of the actions (RL environments). This is fixed by using the ``_step_callback``.
Release v1.0.3 (2026-03-25)
---------------------------
General
^^^^^^^
-1. Fix bug in ``StateBasedGlobalPushingEnv-v0``
-2. Update RL examples
+1. Fixed bug in ``StateBasedGlobalPushingEnv-v0``
+2. Updated RL examples
Release v1.0.2 (2026-02-27)
---------------------------
General
^^^^^^^
-1. Add the 6D-Platform MagBot (``SixDPlatformMagBotsAPM4330``)
-2. Add tutorial on how to add MagBots to custom environments
-3. Add example environments with MagBots (``SixDPlatformMagBotApplicationExampleEnv`` and ``SixDPlatformMagBotExampleEnv``)
+1. Added the 6D-Platform MagBot (``SixDPlatformMagBotsAPM4330``)
+2. Added tutorial on how to add MagBots to custom environments
+3. Added example environments with MagBots (``SixDPlatformMagBotApplicationExampleEnv`` and ``SixDPlatformMagBotExampleEnv``)
Release v1.0.1 (2025-11-24)
---------------------------
General
^^^^^^^
-1. Use MuJoCo functions in ``MoverImpedanceController`` instead of ``scipy.spatial.transform.Rotation`` (better performance, but ``scipy.spatial.transform.Rotation`` is not completely replaced)
-2. Add benchmark environments
-3. Add ``StateBasedPushXEnv-v0`` and ``StateBasedPushLEnv-v0``
-4. Update throughput calculation in pushing environments and update benchmark results accordingly
+1. Used MuJoCo functions in ``MoverImpedanceController`` instead of ``scipy.spatial.transform.Rotation`` (better performance, but ``scipy.spatial.transform.Rotation`` is not completely replaced)
+2. Added benchmark environments
+3. Added ``StateBasedPushXEnv-v0`` and ``StateBasedPushLEnv-v0``
+4. Updated throughput calculation in pushing environments and update benchmark results accordingly
Release v1.0.0 (2025-11-08)
---------------------------
diff --git a/docs/index.rst b/docs/index.rst
index cf2188a..43ec7c2 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -166,6 +166,19 @@ or `Tianshou `_.
The library contains a collection of existing RL environments in the field of Magnetic Robotics, which will continue to be updated in the future, as well as basic single-agent and
multi-agent environments that serve as starting points for the easy development of new research-specific environments.
+Simulation Performance
+^^^^^^^^^^^^^^^^^^^^^^
+Process time scales near-linearly from ~0.15 ms at 50 movers to ~2.7 ms at 1,000 movers, with
+negligible difference between the circle and box collision shapes. Each data point is the mean over
+100 simulation steps (applying controls, one MuJoCo integrator step, and collision checking), with
+tiles in a square grid and one mover per tile. Measured on a MacBook Pro (2020), Apple M1,
+16 GB RAM, macOS 26.2.
+
+.. image:: _static/plot_scalability.svg
+ :alt: Process time per simulation step vs. number of movers, for circle and box collision shapes
+ :align: center
+ :width: 80%
+
.. _citation:
Paper & Citation
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 905b777..fb50f4f 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -38,6 +38,7 @@ outworldbody
params
pre
Pre
+Precompute
preconfigured
preprocessed
programmatically
diff --git a/magbotsim/__init__.py b/magbotsim/__init__.py
index 44600a3..d08b6e4 100644
--- a/magbotsim/__init__.py
+++ b/magbotsim/__init__.py
@@ -1,11 +1,11 @@
-__version__ = '1.0.4a0'
+__version__ = '1.0.4a1'
from gymnasium.envs.registration import register
from magbotsim.basic_magbot_env import BasicMagBotEnv
+from magbotsim.magbots.sixD_platform import SixDPlatformMagBotsAPM4330
from magbotsim.rl_envs.basic_multi_agent_env import BasicMagBotMultiAgentEnv
from magbotsim.rl_envs.basic_single_agent_env import BasicMagBotSingleAgentEnv
-from magbotsim.magbots.sixD_platform import SixDPlatformMagBotsAPM4330
from magbotsim.utils.benchmark_utils import BENCHMARK_PLANNING_LAYOUTS
from magbotsim.utils.impedance_control import MoverImpedanceController
from magbotsim.utils.rendering import Matplotlib2DViewer, MujocoViewerCollection
diff --git a/magbotsim/basic_magbot_env.py b/magbotsim/basic_magbot_env.py
index 12da9f4..497a8cb 100644
--- a/magbotsim/basic_magbot_env.py
+++ b/magbotsim/basic_magbot_env.py
@@ -228,6 +228,8 @@ def __init__(
self.idx_x_tiles_2x2_tr, self.idx_y_tiles_2x2_tr = self.get_tile_indices_mask(mask=mask_2x2_tr)
# padded layout used for wall collision check
self.layout_tiles_wc = np.pad(layout_tiles, ((0, 1), (0, 1)), mode='constant', constant_values=0)
+ # precomputed wall-segment lookup table for qpos_is_valid
+ self.walls_per_tile, self.n_walls_per_tile = self._precompute_wall_segments_fast()
# mover configuration
self.num_movers = num_movers
@@ -401,37 +403,68 @@ def check_mover_collision(
assert mover_qpos.shape == (num_movers, 7)
adjusted_c_size = c_size + self.c_size_offset * int(add_safety_offset)
- c_size_arr = self.get_c_size_arr(c_size=adjusted_c_size, num_reps=self.num_movers)
-
- if num_movers == self.num_movers:
- i_idx, j_idx = self._triu_indices
- else:
- i_idx, j_idx = np.triu_indices(num_movers, k=1)
qpos = mover_qpos[:, :2]
- qpos_i = qpos[i_idx]
- qpos_j = qpos[j_idx]
- delta = qpos_i - qpos_j
- size_i = c_size_arr[i_idx]
- size_j = c_size_arr[j_idx]
+
+ if self.c_shape == 'circle':
+ _max_interaction_dist = 2.0 * float(np.max(adjusted_c_size))
+ else: # 'box'
+ if np.ndim(adjusted_c_size) == 1:
+ # Uniform half-extents (hx, hy): bounding radius = sqrt(hx^2 + hy^2).
+ _max_interaction_dist = 2.0 * float(np.hypot(adjusted_c_size[0], adjusted_c_size[1]))
+ else:
+ # Per-mover half-extents, shape (num_movers, 2).
+ _max_interaction_dist = 2.0 * float(np.max(np.hypot(adjusted_c_size[:, 0], adjusted_c_size[:, 1])))
+
+ i_idx, j_idx = self._build_spatial_pairs(qpos, _max_interaction_dist)
+
+ if len(i_idx) == 0:
+ return np.bool_(False)
+
+ # Compute pairwise displacement for candidate pairs only.
+ delta = qpos[i_idx] - qpos[j_idx]
if self.c_shape == 'circle':
dist_sq = delta[:, 0] ** 2 + delta[:, 1] ** 2
- radius_sum = (size_i.flatten() + size_j.flatten()) ** 2
- return np.any(dist_sq <= radius_sum)
+ # Uniform-size fast path: when all movers share the same radius, the collision
+ # threshold is a scalar (2r)^2
+ if np.ndim(adjusted_c_size) == 0:
+ return np.any(dist_sq <= (2.0 * float(adjusted_c_size)) ** 2)
+ c_size_arr = self.get_c_size_arr(c_size=adjusted_c_size, num_reps=self.num_movers)
+ size_i = c_size_arr[i_idx]
+ size_j = c_size_arr[j_idx]
+ return np.any(dist_sq <= (size_i.flatten() + size_j.flatten()) ** 2)
elif self.c_shape == 'box':
- dist = np.linalg.norm(delta, axis=1)
- diag_size = np.linalg.norm(np.maximum(size_i, size_j) * 2, ord=1, axis=1)
- mask = dist <= diag_size
+ dist_sq = delta[:, 0] ** 2 + delta[:, 1] ** 2
+ # Uniform-size fast path: when adjusted_c_size has shape (2,) all movers share the
+ # same half-extents, so the pre-filter threshold is a scalar
+ uniform_box = hasattr(adjusted_c_size, 'shape') and adjusted_c_size.shape == (2,)
+ if uniform_box:
+ diag_size_sq = ((float(adjusted_c_size[0]) + float(adjusted_c_size[1])) * 2) ** 2
+ mask = dist_sq <= diag_size_sq
+ else:
+ c_size_arr = self.get_c_size_arr(c_size=adjusted_c_size, num_reps=self.num_movers)
+ size_i = c_size_arr[i_idx]
+ size_j = c_size_arr[j_idx]
+ max_sizes = np.maximum(size_i, size_j)
+ diag_size_sq = ((max_sizes[:, 0] + max_sizes[:, 1]) * 2) ** 2
+ mask = dist_sq <= diag_size_sq
if not mask.any():
return np.bool_(False)
qpos_i = mover_qpos[i_idx][mask]
qpos_j = mover_qpos[j_idx][mask]
- size_i = size_i[mask]
- size_j = size_j[mask]
+ if uniform_box:
+ # Broadcast a single (1, 2) half-size row across all candidates
+ half_sizes = adjusted_c_size.reshape(1, 2)
+ size_cand = np.broadcast_to(half_sizes, (int(mask.sum()), 2))
+ size_i = size_cand
+ size_j = size_cand
+ else:
+ size_i = size_i[mask]
+ size_j = size_j[mask]
collisions = geometry_2D_utils.check_rectangles_intersect(
qpos_r1=qpos_i,
@@ -445,6 +478,102 @@ def check_mover_collision(
else:
raise ValueError('Unsupported collision shape.')
+ def _build_spatial_pairs(
+ self,
+ qpos_xy: np.ndarray,
+ max_interaction_dist: float,
+ ) -> tuple[np.ndarray, np.ndarray]:
+ """Return candidate mover-pair indices using the tile grid as a spatial hash.
+
+ Instead of the O(n^2) all-pairs approach (where n is the number of movers), each mover
+ is assigned to the tile its centre falls in via the same floor-division used in
+ :meth:`qpos_is_valid`, and only pairs whose tiles lie within the interaction neighbourhood
+ are returned. This reduces pair count from O(n^2) to O(n * k) where k is the mean number
+ of movers in the neighbourhood.
+
+ :param qpos_xy: (n, 2) array of mover (x, y) positions (n = number of movers).
+ :param max_interaction_dist: conservative upper bound on the centre-to-centre
+ distance at which two movers can possibly collide.
+ :return: ``(i_idx, j_idx)`` flat arrays of mover indices such that every entry
+ ``(i_idx[k], j_idx[k])`` is a unique unordered candidate pair within the
+ neighbourhood. Every potentially colliding pair is guaranteed to appear
+ exactly once.
+ """
+ n = qpos_xy.shape[0]
+
+ tile_step_x = 2.0 * self.tile_size[0]
+ tile_step_y = 2.0 * self.tile_size[1]
+ tile_origin_x = self.x_pos_tiles[0, 0] - self.tile_size[0]
+ tile_origin_y = self.y_pos_tiles[0, 0] - self.tile_size[1]
+
+ # Neighbourhood radius
+ R_x = int(max_interaction_dist / tile_step_x) + 1
+ R_y = int(max_interaction_dist / tile_step_y) + 1
+
+ # Assign each mover to a tile.
+ ti = np.floor((qpos_xy[:, 0] - tile_origin_x) / tile_step_x).astype(np.intp)
+ tj = np.floor((qpos_xy[:, 1] - tile_origin_y) / tile_step_y).astype(np.intp)
+ ti = np.clip(ti, 0, self.num_tiles_x - 1)
+ tj = np.clip(tj, 0, self.num_tiles_y - 1)
+
+ stride = self.num_tiles_y + R_y
+ tile_flat = (ti * stride + tj).astype(np.intp)
+
+ # Sort movers by flat tile index to enable O(n log n) neighbour lookup.
+ sorted_order = np.argsort(tile_flat, kind='stable')
+ sorted_flat = tile_flat[sorted_order]
+
+ all_a: list[np.ndarray] = []
+ all_b: list[np.ndarray] = []
+
+ # Enumerate the canonical upper-half of tile offsets so that each unordered
+ # pair of distinct tiles is visited exactly once
+ for di in range(0, R_x + 1):
+ dj_start = 1 if di == 0 else -R_y
+ for dj in range(dj_start, R_y + 1):
+ offset = np.intp(di * stride + dj)
+ # For each mover (in sorted order), find movers in tile t + (di, dj).
+ target = sorted_flat + offset
+ lo = np.searchsorted(sorted_flat, target, side='left')
+ hi = np.searchsorted(sorted_flat, target, side='right')
+ counts = (hi - lo).astype(np.intp)
+ total = int(counts.sum())
+ if total == 0:
+ continue
+
+ # Vectorised range expansion: build a-indices (repeated per partner count)
+ # and b-indices (lo[a] + local offset within the group) with no Python
+ # loop over individual movers.
+ cumul = np.empty(n + 1, dtype=np.intp)
+ cumul[0] = 0
+ np.cumsum(counts, out=cumul[1:])
+ a_s = np.repeat(np.arange(n, dtype=np.intp), counts)
+ group_local = np.arange(total, dtype=np.intp) - np.repeat(cumul[:-1], counts)
+ b_s = np.repeat(lo, counts) + group_local
+ all_a.append(sorted_order[a_s])
+ all_b.append(sorted_order[b_s])
+
+ # For each mover at sorted position i, pair it with every mover j > i in
+ # the same tile — ensures each unordered intra-tile pair appears exactly once.
+ hi_same = np.searchsorted(sorted_flat, sorted_flat, side='right')
+ adj_lo = np.arange(n, dtype=np.intp) + 1 # i+1 guarantees i < j in sorted order
+ counts_s = np.maximum(hi_same - adj_lo, 0).astype(np.intp)
+ total_s = int(counts_s.sum())
+ if total_s > 0:
+ cumul_s = np.empty(n + 1, dtype=np.intp)
+ cumul_s[0] = 0
+ np.cumsum(counts_s, out=cumul_s[1:])
+ a_s2 = np.repeat(np.arange(n, dtype=np.intp), counts_s)
+ group_local_s = np.arange(total_s, dtype=np.intp) - np.repeat(cumul_s[:-1], counts_s)
+ b_s2 = np.repeat(adj_lo, counts_s) + group_local_s
+ all_a.append(sorted_order[a_s2])
+ all_b.append(sorted_order[b_s2])
+
+ if not all_a:
+ return np.empty(0, dtype=np.intp), np.empty(0, dtype=np.intp)
+
+ return np.concatenate(all_a), np.concatenate(all_b)
+
def check_wall_collision(
self,
mover_names: list[str],
@@ -512,22 +641,20 @@ def qpos_is_valid(self, qpos: np.ndarray, c_size: float | np.ndarray, add_safety
ignore_orientation = self.c_shape == 'circle'
layout = self.layout_tiles
- layout_wc = self.layout_tiles_wc
x_pos_tiles = self.x_pos_tiles
y_pos_tiles = self.y_pos_tiles
tile_size_x, tile_size_y = self.tile_size[:2]
+ # Fast path: all tiles present — only the four global border walls matter
if np.all(layout == 1):
qpos_x = qpos[:, 0]
qpos_y = qpos[:, 1]
-
min_x_bound = x_pos_tiles[0, 0] - tile_size_x
max_x_bound = x_pos_tiles[-1, -1] + tile_size_x
min_y_bound = y_pos_tiles[0, 0] - tile_size_y
max_y_bound = y_pos_tiles[-1, -1] + tile_size_y
-
if ignore_orientation:
- pos_is_valid = (
+ return (
(min_x_bound < qpos_x - c_size_arr[:, 0])
& (qpos_x + c_size_arr[:, 0] < max_x_bound)
& (min_y_bound < qpos_y - c_size_arr[:, 0])
@@ -535,309 +662,252 @@ def qpos_is_valid(self, qpos: np.ndarray, c_size: float | np.ndarray, add_safety
).astype(int)
else:
mover_vertices = geometry_2D_utils.get_2D_rect_vertices(qpos=qpos, size=c_size_arr)
- pos_is_valid = (
- (min_x_bound < mover_vertices[:, 0, :]).all(axis=1)
- & (mover_vertices[:, 0, :] < max_x_bound).all(axis=1)
- & (min_y_bound < mover_vertices[:, 1, :]).all(axis=1)
- & (mover_vertices[:, 1, :] < max_y_bound).all(axis=1)
+
+ vx = mover_vertices[:, 0, :] # (n, 4) x-coords of vertices
+ vy = mover_vertices[:, 1, :] # (n, 4) y-coords of vertices
+ return (
+ (min_x_bound < vx.min(axis=1))
+ & (vx.max(axis=1) < max_x_bound)
+ & (min_y_bound < vy.min(axis=1))
+ & (vy.max(axis=1) < max_y_bound)
).astype(int)
- return pos_is_valid
+ # Locate each mover's tile via floor division
+ tile_step_x = 2.0 * tile_size_x
+ tile_step_y = 2.0 * tile_size_y
+ tile_origin_x = x_pos_tiles[0, 0] - tile_size_x
+ tile_origin_y = y_pos_tiles[0, 0] - tile_size_y
- # collision shape == 'box': get mover vertices
- if not ignore_orientation:
- mover_vertices = geometry_2D_utils.get_2D_rect_vertices(qpos=qpos, size=c_size_arr)
+ tile_i_f = (qpos[:, 0] - tile_origin_x) / tile_step_x
+ tile_j_f = (qpos[:, 1] - tile_origin_y) / tile_step_y
- # start test
- pos_is_valid = np.zeros(num_qpos, dtype=np.int8)
+ in_bounds = (tile_i_f >= 0.0) & (tile_i_f < self.num_tiles_x) & (tile_j_f >= 0.0) & (tile_j_f < self.num_tiles_y)
- # roughly locate the movers -> find the indices of tiles with a mover above them
- qpos_x_all = qpos[:, 0, np.newaxis, np.newaxis]
- qpos_y_all = qpos[:, 1, np.newaxis, np.newaxis]
+ tile_i = np.clip(np.floor(tile_i_f).astype(np.intp), 0, self.num_tiles_x - 1)
+ tile_j = np.clip(np.floor(tile_j_f).astype(np.intp), 0, self.num_tiles_y - 1)
- mask_above_tile = (
- (x_pos_tiles - tile_size_x <= qpos_x_all)
- & (qpos_x_all <= x_pos_tiles + tile_size_x)
- & (y_pos_tiles - tile_size_y <= qpos_y_all)
- & (qpos_y_all <= y_pos_tiles + tile_size_y)
- )
+ # Positions above absent tiles are immediately invalid
+ pos_is_valid = in_bounds & (layout[tile_i, tile_j] == 1)
- assert np.sum(mask_above_tile) >= num_qpos, (
- 'At least one mover is not above a tile. An episode should be terminated in case of wall collision. '
- + 'This error is probably caused by a missed termination of the episode.'
- )
- idx_qpos, idx_tiles_x, idx_tiles_y = np.nonzero(mask_above_tile)
- if not ignore_orientation:
- mover_vertices = mover_vertices[idx_qpos, :, :]
- # min, max x pos of all relevant tiles
- min_x_tiles = x_pos_tiles[idx_tiles_x, idx_tiles_y] - tile_size_x
- max_x_tiles = x_pos_tiles[idx_tiles_x, idx_tiles_y] + tile_size_x
- # min, max y pos of all relevant tiles
- min_y_tiles = y_pos_tiles[idx_tiles_x, idx_tiles_y] - tile_size_y
- max_y_tiles = y_pos_tiles[idx_tiles_x, idx_tiles_y] + tile_size_y
-
- # check whether the tiles are completely surrounded by other tiles
- # mask_complete.shape == (num_qpos,self.idx_x_tiles_3x3.shape[0])
- mask_complete = (idx_tiles_x[:, np.newaxis] == self.idx_x_tiles_3x3[np.newaxis, :]) * (
- idx_tiles_y[:, np.newaxis] == self.idx_y_tiles_3x3[np.newaxis, :]
- )
- idx_qpos_complete = idx_qpos[np.where(mask_complete)[0]]
- pos_is_valid[idx_qpos_complete] = 1
- if np.sum(pos_is_valid) == num_qpos:
- return pos_is_valid
-
- # at least one pos is above a tile which is not completely surrounded by other tiles
- # (possibly without required safety margin to the edges of the tile)
- # safe = above_tile and all distances to edges > safety margin
- if ignore_orientation:
- rep = 1
- layout_tiles_expanded = layout[idx_tiles_x, idx_tiles_y][:, np.newaxis]
- qpos_x_expanded = qpos[idx_qpos, 0][:, np.newaxis]
- qpos_y_expanded = qpos[idx_qpos, 1][:, np.newaxis]
- c_size_expanded = c_size_arr[idx_qpos, :]
- min_x_tiles_expanded = min_x_tiles[:, np.newaxis]
- max_x_tiles_expanded = max_x_tiles[:, np.newaxis]
- min_y_tiles_expanded = min_y_tiles[:, np.newaxis]
- max_y_tiles_expanded = max_y_tiles[:, np.newaxis]
-
- min_x_safe = layout_tiles_expanded * (min_x_tiles_expanded < qpos_x_expanded - c_size_expanded).astype(np.int8)
- max_x_safe = layout_tiles_expanded * (qpos_x_expanded + c_size_expanded < max_x_tiles_expanded).astype(np.int8)
- min_y_safe = layout_tiles_expanded * (min_y_tiles_expanded < qpos_y_expanded - c_size_expanded).astype(np.int8)
- max_y_safe = layout_tiles_expanded * (qpos_y_expanded + c_size_expanded < max_y_tiles_expanded).astype(np.int8)
- else:
- rep = 4
- layout_tiles_broadcast = layout[idx_tiles_x, idx_tiles_y][:, np.newaxis]
- min_x_tiles_broadcast = min_x_tiles[:, np.newaxis]
- max_x_tiles_broadcast = max_x_tiles[:, np.newaxis]
- min_y_tiles_broadcast = min_y_tiles[:, np.newaxis]
- max_y_tiles_broadcast = max_y_tiles[:, np.newaxis]
-
- min_x_safe = layout_tiles_broadcast * (min_x_tiles_broadcast < mover_vertices[:, 0, :]).astype(np.int8)
- max_x_safe = layout_tiles_broadcast * (mover_vertices[:, 0, :] < max_x_tiles_broadcast).astype(np.int8)
- min_y_safe = layout_tiles_broadcast * (min_y_tiles_broadcast < mover_vertices[:, 1, :]).astype(np.int8)
- max_y_safe = layout_tiles_broadcast * (mover_vertices[:, 1, :] < max_y_tiles_broadcast).astype(np.int8)
-
- # mask minimum and maximum indices
- mask_idx_x_lmin = (idx_tiles_x > 0).astype(np.int8)
- mask_idx_y_lmin = (idx_tiles_y > 0).astype(np.int8)
- mask_idx_x_smax = (idx_tiles_x < self.num_tiles_x - 1).astype(np.int8)
- mask_idx_y_smax = (idx_tiles_y < self.num_tiles_y - 1).astype(np.int8)
-
- mask_valid = (min_x_safe * max_x_safe * min_y_safe * max_y_safe).astype(np.int8)
-
- # update min_x_safe
- mask_min_x_update_base = (mask_idx_x_lmin * layout_wc[idx_tiles_x, idx_tiles_y] * layout_wc[idx_tiles_x - 1, idx_tiles_y])[
- :, np.newaxis
- ]
- mask_min_x_update = (1 - min_x_safe) * mask_min_x_update_base
- mask_valid = mask_valid + mask_min_x_update * min_y_safe * max_y_safe
- # update min_y_safe based on min_x_safe-update
- mask_min_x_min_y_update = (1 - min_y_safe) * np.tile(
- mask_idx_x_lmin
- * mask_idx_y_lmin
- * layout_wc[idx_tiles_x, idx_tiles_y]
- * layout_wc[idx_tiles_x, idx_tiles_y - 1]
- * layout_wc[idx_tiles_x - 1, idx_tiles_y - 1],
- reps=(rep, 1),
- ).T
- mask_valid = mask_valid + mask_min_x_update * mask_min_x_min_y_update
- # update max_y_safe based on min_x_safe-update
- mask_min_x_max_y_update_base = (
- mask_idx_x_lmin
- * mask_idx_y_smax
- * layout_wc[idx_tiles_x, idx_tiles_y]
- * layout_wc[idx_tiles_x, idx_tiles_y + 1]
- * layout_wc[idx_tiles_x - 1, idx_tiles_y + 1]
- )[:, np.newaxis]
- mask_min_x_max_y_update = (1 - max_y_safe) * mask_min_x_max_y_update_base
- mask_valid = mask_valid + mask_min_x_update * mask_min_x_max_y_update
-
- # update max_x_safe
- mask_max_x_update_base = (mask_idx_x_smax * layout_wc[idx_tiles_x, idx_tiles_y] * layout_wc[idx_tiles_x + 1, idx_tiles_y])[
- :, np.newaxis
- ]
- mask_max_x_update = (1 - max_x_safe) * mask_max_x_update_base
- mask_valid = mask_valid + mask_max_x_update * min_y_safe * max_y_safe
- # update min_y_safe based on max_x_safe-update
- mask_max_x_min_y_update_base = (
- mask_idx_x_smax
- * mask_idx_y_lmin
- * layout_wc[idx_tiles_x, idx_tiles_y]
- * layout_wc[idx_tiles_x, idx_tiles_y - 1]
- * layout_wc[idx_tiles_x + 1, idx_tiles_y - 1]
- )[:, np.newaxis]
- mask_max_x_min_y_update = (1 - min_y_safe) * mask_max_x_min_y_update_base
- mask_valid = mask_valid + mask_max_x_update * mask_max_x_min_y_update
- # update max_y_safe based on max_x_safe-update
- mask_max_x_max_y_update_base = (
- mask_idx_x_smax
- * mask_idx_y_smax
- * layout_wc[idx_tiles_x, idx_tiles_y]
- * layout_wc[idx_tiles_x, idx_tiles_y + 1]
- * layout_wc[idx_tiles_x + 1, idx_tiles_y + 1]
- )[:, np.newaxis]
- mask_max_x_max_y_update = (1 - max_y_safe) * mask_max_x_max_y_update_base
- mask_valid = mask_valid + mask_max_x_update * mask_max_x_max_y_update
-
- # update min_y_safe
- mask_min_y_update_base = (mask_idx_y_lmin * layout_wc[idx_tiles_x, idx_tiles_y] * layout_wc[idx_tiles_x, idx_tiles_y - 1])[
- :, np.newaxis
- ]
- mask_min_y_update = (1 - min_y_safe) * mask_min_y_update_base
- mask_valid = mask_valid + mask_min_y_update * min_x_safe * max_x_safe
+ # Run wall-intersection tests for the remaining candidates
+ candidates = np.where(pos_is_valid)[0]
+ if len(candidates) == 0:
+ return pos_is_valid.astype(int)
- # update max_y_safe
- mask_max_y_update_base = (mask_idx_y_smax * layout_wc[idx_tiles_x, idx_tiles_y] * layout_wc[idx_tiles_x, idx_tiles_y + 1])[
- :, np.newaxis
- ]
- mask_max_y_update = (1 - max_y_safe) * mask_max_y_update_base
- mask_valid = mask_valid + mask_max_y_update * min_x_safe * max_x_safe
+ cand_ti = tile_i[candidates]
+ cand_tj = tile_j[candidates]
- assert np.bitwise_or(mask_valid == 0, mask_valid == 1).all()
+ # Movers whose tile neighbourhood has no wall segments are unconditionally valid
+ n_walls = self.n_walls_per_tile[cand_ti, cand_tj]
+ has_walls = n_walls > 0
+ wall_cand_local = np.where(has_walls)[0] # indices into `candidates`
- if ignore_orientation:
- mask_valid = mask_valid.flatten()
- else:
- mask_valid = np.sum(mask_valid, axis=1) == 4
-
- idx_tiles_x_expanded = idx_tiles_x[:, np.newaxis]
- idx_tiles_y_expanded = idx_tiles_y[:, np.newaxis]
- tile_size_2d = self.tile_size[:2]
-
- # bottom left
- mask_2x2_bl_base = (mask_valid * mask_idx_x_smax * mask_idx_y_lmin)[:, np.newaxis]
- mask_2x2_bl = (
- mask_2x2_bl_base
- * (idx_tiles_x_expanded == self.idx_x_tiles_2x2_bl[np.newaxis, :])
- * (idx_tiles_y_expanded == self.idx_y_tiles_2x2_bl[np.newaxis, :] + 1)
- )
- sum_bl = np.sum(mask_2x2_bl, axis=1)
- assert np.bitwise_or(sum_bl == 0, sum_bl == 1).all()
- idx_qpos_bl = idx_qpos[sum_bl == 1]
- if len(idx_qpos_bl) > 0:
- qpos_missing_tiles = np.zeros((idx_qpos_bl.shape[0], 7), dtype=np.float64)
- qpos_missing_tiles[:, 3] = 1.0
+ if len(wall_cand_local) > 0:
+ wall_cands = candidates[wall_cand_local]
+ wi = cand_ti[wall_cand_local]
+ wj = cand_tj[wall_cand_local]
- idx_mask_bl = np.where(mask_2x2_bl)
+ # Gather precomputed wall segments: shape (n, max_walls, 4)
+ walls = self.walls_per_tile[wi, wj]
+ nw = n_walls[wall_cand_local]
- x_pos_values = self.x_pos_tiles[self.idx_x_tiles_2x2_bl + 1, self.idx_y_tiles_2x2_bl]
- y_pos_values = self.y_pos_tiles[self.idx_x_tiles_2x2_bl + 1, self.idx_y_tiles_2x2_bl]
+ if ignore_orientation:
+ intersects = self._circle_walls_intersect_fast(
+ centers=qpos[wall_cands, :2],
+ radii=c_size_arr[wall_cands],
+ walls=walls,
+ n_walls=nw,
+ )
+ else:
+ intersects = self._box_walls_intersect_fast(
+ qpos=qpos[wall_cands],
+ sizes=c_size_arr[wall_cands],
+ walls=walls,
+ n_walls=nw,
+ )
- # Use advanced indexing instead of broadcasting large arrays
- qpos_missing_tiles[:, 0] = x_pos_values[idx_mask_bl[1]]
- qpos_missing_tiles[:, 1] = y_pos_values[idx_mask_bl[1]]
+ pos_is_valid[wall_cands[intersects]] = False
- tile_sizes_array = np.tile(tile_size_2d, (idx_qpos_bl.shape[0], 1))
+ assert isinstance(pos_is_valid, np.ndarray) and pos_is_valid.shape == (num_qpos,)
+ return pos_is_valid.astype(int)
- mt_intersect = geometry_2D_utils.check_rectangles_intersect(
- qpos_r1=qpos[idx_qpos_bl, :],
- qpos_r2=qpos_missing_tiles,
- size_r1=c_size_arr[idx_qpos_bl, :],
- size_r2=tile_sizes_array,
- )
- mask_valid[idx_mask_bl[0]] = (1 - mt_intersect) * mask_valid[idx_mask_bl[0]]
-
- # bottom right
- mask_2x2_br_base = (mask_valid * mask_idx_x_smax * mask_idx_y_smax)[:, np.newaxis]
- mask_2x2_br = (
- mask_2x2_br_base
- * (idx_tiles_x_expanded == self.idx_x_tiles_2x2_br[np.newaxis, :])
- * (idx_tiles_y_expanded == self.idx_y_tiles_2x2_br[np.newaxis, :])
- )
- sum_br = np.sum(mask_2x2_br, axis=1)
- assert np.bitwise_or(sum_br == 0, sum_br == 1).all()
- idx_qpos_br = idx_qpos[sum_br == 1]
- if len(idx_qpos_br) > 0:
- qpos_missing_tiles = np.zeros((idx_qpos_br.shape[0], 7), dtype=np.float64)
- qpos_missing_tiles[:, 3] = 1.0
-
- idx_mask_br = np.where(mask_2x2_br)
- x_pos_values = self.x_pos_tiles[self.idx_x_tiles_2x2_br + 1, self.idx_y_tiles_2x2_br + 1]
- y_pos_values = self.y_pos_tiles[self.idx_x_tiles_2x2_br + 1, self.idx_y_tiles_2x2_br + 1]
-
- qpos_missing_tiles[:, 0] = x_pos_values[idx_mask_br[1]]
- qpos_missing_tiles[:, 1] = y_pos_values[idx_mask_br[1]]
-
- tile_sizes_array = np.tile(tile_size_2d, (idx_qpos_br.shape[0], 1))
-
- mt_intersect = geometry_2D_utils.check_rectangles_intersect(
- qpos_r1=qpos[idx_qpos_br, :],
- qpos_r2=qpos_missing_tiles,
- size_r1=c_size_arr[idx_qpos_br, :],
- size_r2=tile_sizes_array,
- )
- mask_valid[idx_mask_br[0]] = (1 - mt_intersect) * mask_valid[idx_mask_br[0]]
-
- # top left
- mask_2x2_tl_base = (mask_valid * mask_idx_x_lmin * mask_idx_y_lmin)[:, np.newaxis]
- mask_2x2_tl = (
- mask_2x2_tl_base
- * (idx_tiles_x_expanded == self.idx_x_tiles_2x2_tl[np.newaxis, :] + 1)
- * (idx_tiles_y_expanded == self.idx_y_tiles_2x2_tl[np.newaxis, :] + 1)
- )
- sum_tl = np.sum(mask_2x2_tl, axis=1)
- assert np.bitwise_or(sum_tl == 0, sum_tl == 1).all()
- idx_qpos_tl = idx_qpos[sum_tl == 1]
- if len(idx_qpos_tl) > 0:
- qpos_missing_tiles = np.zeros((idx_qpos_tl.shape[0], 7), dtype=np.float64)
- qpos_missing_tiles[:, 3] = 1.0
-
- idx_mask_tl = np.where(mask_2x2_tl)
- x_pos_values = self.x_pos_tiles[self.idx_x_tiles_2x2_tl, self.idx_y_tiles_2x2_tl]
- y_pos_values = self.y_pos_tiles[self.idx_x_tiles_2x2_tl, self.idx_y_tiles_2x2_tl]
-
- qpos_missing_tiles[:, 0] = x_pos_values[idx_mask_tl[1]]
- qpos_missing_tiles[:, 1] = y_pos_values[idx_mask_tl[1]]
-
- tile_sizes_array = np.tile(tile_size_2d, (idx_qpos_tl.shape[0], 1))
-
- mt_intersect = geometry_2D_utils.check_rectangles_intersect(
- qpos_r1=qpos[idx_qpos_tl, :],
- qpos_r2=qpos_missing_tiles,
- size_r1=c_size_arr[idx_qpos_tl, :],
- size_r2=tile_sizes_array,
- )
- mask_valid[idx_mask_tl[0]] = (1 - mt_intersect) * mask_valid[idx_mask_tl[0]]
-
- # top right
- mask_2x2_tr_base = (mask_valid * mask_idx_x_lmin * mask_idx_y_smax)[:, np.newaxis]
- mask_2x2_tr = (
- mask_2x2_tr_base
- * (idx_tiles_x_expanded == self.idx_x_tiles_2x2_tr[np.newaxis, :] + 1)
- * (idx_tiles_y_expanded == self.idx_y_tiles_2x2_tr[np.newaxis, :])
- )
- sum_tr = np.sum(mask_2x2_tr, axis=1)
- assert np.bitwise_or(sum_tr == 0, sum_tr == 1).all()
- idx_qpos_tr = idx_qpos[sum_tr == 1]
- if len(idx_qpos_tr) > 0:
- qpos_missing_tiles = np.zeros((idx_qpos_tr.shape[0], 7), dtype=np.float64)
- qpos_missing_tiles[:, 3] = 1.0
+ @staticmethod
+ def _circle_walls_intersect_fast(
+ centers: np.ndarray,
+ radii: np.ndarray,
+ walls: np.ndarray,
+ n_walls: np.ndarray,
+ ) -> np.ndarray:
+ """Return True for each mover whose circle intersects at least one wall segment.
- idx_mask_tr = np.where(mask_2x2_tr)
+ The distance from the circle centre to each axis-aligned wall segment is computed in
+ closed form using the standard point-to-segment projection formula, vectorised over
+ the (num_movers, max_walls) pair space.
- x_pos_values = self.x_pos_tiles[self.idx_x_tiles_2x2_tr, self.idx_y_tiles_2x2_tr + 1]
- y_pos_values = self.y_pos_tiles[self.idx_x_tiles_2x2_tr, self.idx_y_tiles_2x2_tr + 1]
+ :param centers: (n, 2) circle centres, where n is the number of movers
+ :param radii: (n, 1) circle radii
+ :param walls: (n, max_walls, 4) wall segments as (x1, y1, x2, y2)
+ :param n_walls: (n,) number of valid wall entries per mover (remainder are padding)
+ :return: (n,) bool array, True where at least one wall is intersected
+ """
+ px = centers[:, 0:1] # (n, 1)
+ py = centers[:, 1:2]
- qpos_missing_tiles[:, 0] = x_pos_values[idx_mask_tr[1]]
- qpos_missing_tiles[:, 1] = y_pos_values[idx_mask_tr[1]]
+ x1 = walls[:, :, 0] # (n, max_walls)
+ y1 = walls[:, :, 1]
+ x2 = walls[:, :, 2]
+ y2 = walls[:, :, 3]
- tile_sizes_array = np.tile(tile_size_2d, (idx_qpos_tr.shape[0], 1))
+ dx = x2 - x1
+ dy = y2 - y1
+ len_sq = dx * dx + dy * dy # always > 0 for tile-edge segments
- mt_intersect = geometry_2D_utils.check_rectangles_intersect(
- qpos_r1=qpos[idx_qpos_tr, :],
- qpos_r2=qpos_missing_tiles,
- size_r1=c_size_arr[idx_qpos_tr, :],
- size_r2=tile_sizes_array,
- )
- mask_valid[idx_mask_tr[0]] = (1 - mt_intersect) * mask_valid[idx_mask_tr[0]]
+ # Parameter t of the closest point on the segment, clamped to [0, 1]
+ t = np.clip(((px - x1) * dx + (py - y1) * dy) / (len_sq + 1e-30), 0.0, 1.0)
- if len(idx_qpos) > 0:
- unique_idx, inverse = np.unique(idx_qpos, return_inverse=True)
- all_valid = np.bincount(inverse) == np.bincount(inverse, weights=mask_valid)
- pos_is_valid[unique_idx[all_valid]] = 1
+ # Squared distance from centre to the closest point on the segment
+ dist_sq = (px - (x1 + t * dx)) ** 2 + (py - (y1 + t * dy)) ** 2 # (n, max_walls)
- assert isinstance(pos_is_valid, np.ndarray) and pos_is_valid.shape == (num_qpos,)
- return pos_is_valid.astype(int)
+ # Only count intersections against walls that actually exist for this mover
+ valid_wall = np.arange(walls.shape[1])[np.newaxis, :] < n_walls[:, np.newaxis] # (n, max_walls)
+
+ return np.any(valid_wall & (dist_sq <= radii**2), axis=1)
+
+ @staticmethod
+ def _box_walls_intersect_fast(
+ qpos: np.ndarray,
+ sizes: np.ndarray,
+ walls: np.ndarray,
+ n_walls: np.ndarray,
+ ) -> np.ndarray:
+ """Return True for each mover whose rotated box intersects at least one wall segment.
+
+ :param qpos: (n, 7) box positions and quaternions (x, y, z, w, qx, qy, qz), where n is the number of movers
+ :param sizes: (n, 2) box half-sizes (hx, hy)
+ :param walls: (n, max_walls, 4) wall segments as (x1, y1, x2, y2)
+ :param n_walls: (n,) number of valid wall entries per mover
+ :return: (n,) bool array, True where at least one wall is intersected
+ """
+ # Box centre and half-sizes, shaped for (n, max_walls) broadcasting
+ cx = qpos[:, 0, np.newaxis] # (n, 1)
+ cy = qpos[:, 1, np.newaxis]
+ hx = sizes[:, 0, np.newaxis]
+ hy = sizes[:, 1, np.newaxis]
+
+ # Yaw angle extracted from quaternion stored as (w, qx, qy, qz) at indices 3-6
+ w_q = qpos[:, 3]
+ q_x = qpos[:, 4]
+ q_y = qpos[:, 5]
+ q_z = qpos[:, 6]
+ yaw = np.arctan2(2.0 * (w_q * q_z + q_x * q_y), 1.0 - 2.0 * (q_y * q_y + q_z * q_z))
+ cos_t = np.cos(yaw)[:, np.newaxis] # (n, 1)
+ sin_t = np.sin(yaw)[:, np.newaxis]
+
+ x1 = walls[:, :, 0] # (n, max_walls)
+ y1 = walls[:, :, 1]
+ x2 = walls[:, :, 2]
+ y2 = walls[:, :, 3]
+
+ # Mask for padding slots that do not correspond to real wall segments
+ valid_wall = np.arange(walls.shape[1])[np.newaxis, :] < n_walls[:, np.newaxis] # (n, max_walls)
+
+ # Axis 1: global x (1, 0)
+ # Box AABB half-extent along x: |hx·cos| + |hy·sin|
+ Rx = np.abs(hx * cos_t) + np.abs(hy * sin_t)
+ sep_x = (cx + Rx < np.minimum(x1, x2)) | (cx - Rx > np.maximum(x1, x2))
+
+ # Axis 2: global y (0, 1)
+ Ry = np.abs(hx * sin_t) + np.abs(hy * cos_t)
+ sep_y = (cy + Ry < np.minimum(y1, y2)) | (cy - Ry > np.maximum(y1, y2))
+
+ # Axis 3: box local x = (cos_t, sin_t)
+ # Project both segment endpoints onto this axis, relative to the box centre
+ p1_ax = (x1 - cx) * cos_t + (y1 - cy) * sin_t
+ p2_ax = (x2 - cx) * cos_t + (y2 - cy) * sin_t
+ sep_ax = (np.maximum(p1_ax, p2_ax) < -hx) | (np.minimum(p1_ax, p2_ax) > hx)
+
+ # Axis 4: box local y = (-sin_t, cos_t)
+ p1_ay = -(x1 - cx) * sin_t + (y1 - cy) * cos_t
+ p2_ay = -(x2 - cx) * sin_t + (y2 - cy) * cos_t
+ sep_ay = (np.maximum(p1_ay, p2_ay) < -hy) | (np.minimum(p1_ay, p2_ay) > hy)
+
+ # Separated on any axis -> no intersection for that (mover, wall) pair
+ no_intersect = sep_x | sep_y | sep_ax | sep_ay
+
+ return np.any(valid_wall & ~no_intersect, axis=1)
+
+ def _precompute_wall_segments_fast(self) -> tuple[np.ndarray, np.ndarray]:
+ """Precompute a per-tile lookup table of wall segments for qpos_is_valid.
+
+ A wall segment is an axis-aligned line segment along the edge of a present tile that
+ faces either an absent tile or the layout border. For each present tile (i, j), every
+ wall segment within its 3 x 3 tile neighbourhood is collected and packed into a padded
+ array, enabling O(1) retrieval at runtime.
+
+ :return: a tuple (walls_per_tile, n_walls_per_tile) where
+
+ - walls_per_tile has shape (num_tiles_x, num_tiles_y, max_walls, 4), storing
+ wall segments as (x1, y1, x2, y2); unused slots are zero-filled.
+ - n_walls_per_tile has shape (num_tiles_x, num_tiles_y) and holds the count of
+ valid wall segments per tile.
+ """
+ ts_x, ts_y = self.tile_size[:2]
+ num_x, num_y = self.num_tiles_x, self.num_tiles_y
+ layout = self.layout_tiles
+
+ # Collect the wall segments owned by each individual present tile.
+ # A segment is added for each edge that borders an absent tile or the grid border.
+ tile_walls: dict[tuple[int, int], list[tuple[float, float, float, float]]] = {}
+ for i in range(num_x):
+ for j in range(num_y):
+ if layout[i, j] == 0:
+ continue
+ x = self.x_pos_tiles[i, j]
+ y = self.y_pos_tiles[i, j]
+ x_l, x_r = x - ts_x, x + ts_x
+ y_b, y_t = y - ts_y, y + ts_y
+ segs: list[tuple[float, float, float, float]] = []
+ if i == 0 or layout[i - 1, j] == 0:
+ segs.append((x_l, y_b, x_l, y_t)) # left wall (vertical)
+ if i == num_x - 1 or layout[i + 1, j] == 0:
+ segs.append((x_r, y_b, x_r, y_t)) # right wall (vertical)
+ if j == 0 or layout[i, j - 1] == 0:
+ segs.append((x_l, y_b, x_r, y_b)) # bottom wall (horizontal)
+ if j == num_y - 1 or layout[i, j + 1] == 0:
+ segs.append((x_l, y_t, x_r, y_t)) # top wall (horizontal)
+ if segs:
+ tile_walls[(i, j)] = segs
+
+ # For each present tile, merge the wall segments from its 3 x 3 neighbourhood.
+ # A mover centred in tile (i, j) can reach at most one tile in any direction, so the
+ # 3 x 3 window captures all walls it could possibly intersect.
+ max_walls = 0
+ neighbourhood_walls: dict[tuple[int, int], list[tuple[float, float, float, float]]] = {}
+ for i in range(num_x):
+ for j in range(num_y):
+ if layout[i, j] == 0:
+ continue
+ merged: list[tuple[float, float, float, float]] = []
+ for di in (-1, 0, 1):
+ for dj in (-1, 0, 1):
+ nb = (i + di, j + dj)
+ if nb in tile_walls:
+ merged.extend(tile_walls[nb])
+ neighbourhood_walls[(i, j)] = merged
+ if len(merged) > max_walls:
+ max_walls = len(merged)
+
+ max_walls = max(max_walls, 1) # guard against single-tile layouts
+
+ walls_per_tile = np.zeros((num_x, num_y, max_walls, 4), dtype=np.float64)
+ n_walls_per_tile = np.zeros((num_x, num_y), dtype=np.int32)
+ for (i, j), segs in neighbourhood_walls.items():
+ n = len(segs)
+ n_walls_per_tile[i, j] = n
+ if n > 0:
+ walls_per_tile[i, j, :n, :] = np.array(segs, dtype=np.float64)
+
+ return walls_per_tile, n_walls_per_tile
###################################################
# MuJoCo #
@@ -861,26 +931,16 @@ def get_mover_qpos(self, mover_names: str | list[str], add_noise: bool = False)
"""
if isinstance(mover_names, str):
mover_names = [mover_names]
- mover_indices = np.array([self.mover_name_to_idx[mover_name] for mover_name in mover_names], dtype=np.int32)
- mover_qpos = self.data.qpos[self.mover_qpos_indices[mover_indices, :]]
- if isinstance(self.mover_shape, list):
- shapes = np.array(self.mover_shape[mover_indices])
- else:
- assert isinstance(self.mover_shape, str)
- shapes = np.array([self.mover_shape] * mover_qpos.shape[0])
-
- if len(self.mover_size.shape) == 2:
- sizes = self.mover_size[mover_indices, :]
+ # Fast path: caller requested all movers
+ if mover_names is self.mover_names:
+ mover_qpos = self.data.qpos[self.mover_qpos_indices]
+ mover_qpos[:, 2] -= self._mover_z_adj
else:
- assert len(self.mover_size.shape) == 1
- sizes = np.broadcast_to(self.mover_size, (mover_qpos.shape[0], self.mover_size.shape[0]))
+ mover_indices = np.array([self.mover_name_to_idx[mover_name] for mover_name in mover_names], dtype=np.int32)
+ mover_qpos = self.data.qpos[self.mover_qpos_indices[mover_indices, :]]
- mask_box_mesh = np.bitwise_or(shapes == 'box', shapes == 'mesh')
- mask_cylinder = shapes == 'cylinder'
-
- mover_qpos[mask_box_mesh, 2] -= sizes[mask_box_mesh, 2]
- mover_qpos[mask_cylinder, 2] -= sizes[mask_cylinder, 1]
+ mover_qpos[:, 2] -= self._mover_z_adj[mover_indices]
if add_noise:
mover_qpos += self.rng_noise.normal(loc=0.0, scale=self.std_noise[0], size=mover_qpos.shape)
@@ -950,6 +1010,32 @@ def update_cached_mover_mujoco_data(self) -> None:
self.mover_qpos_indices = mover_joint_qpos_adrs[:, np.newaxis] + np.arange(qpos_ndim)
self.mover_qvel_qacc_indices = mover_joint_qvel_qacc_adrs[:, np.newaxis] + np.arange(qvel_qacc_ndim)
+ # Precompute the per-mover z-position adjustment applied in get_mover_qpos
+ self._mover_z_adj = self._build_mover_z_adj()
+
+ def _build_mover_z_adj(self) -> np.ndarray:
+ """Precompute per-mover z-position adjustments.
+
+ :return: float64 array of shape (num_movers,) where entry ``i`` is the z offset to subtract from the raw qpos z of mover ``i``
+ (ordered consistently with ``self.mover_names``). For box/mesh movers this is ``size[2]``; for cylinder movers it
+ is ``size[1]``; for any other shape it is ``0``.
+ """
+ if isinstance(self.mover_shape, list):
+ shapes = np.array(self.mover_shape)
+ else:
+ shapes = np.array([self.mover_shape] * self.num_movers)
+
+ if len(self.mover_size.shape) == 2:
+ sizes = self.mover_size # (num_movers, 3)
+ else:
+ sizes = np.broadcast_to(self.mover_size, (self.num_movers, self.mover_size.shape[0]))
+
+ z_adj = np.zeros(self.num_movers, dtype=np.float64)
+ mask_box_mesh = np.bitwise_or(shapes == 'box', shapes == 'mesh')
+ mask_cylinder = shapes == 'cylinder'
+ z_adj[mask_box_mesh] = sizes[mask_box_mesh, 2]
+ z_adj[mask_cylinder] = sizes[mask_cylinder, 1]
+ return z_adj
def _generate_mover_xml_strings(
self,
diff --git a/magbotsim/rl_envs/object_manipulation/pushing/state_based_global_pushing_env.py b/magbotsim/rl_envs/object_manipulation/pushing/state_based_global_pushing_env.py
index f390d0b..9fb4e7a 100644
--- a/magbotsim/rl_envs/object_manipulation/pushing/state_based_global_pushing_env.py
+++ b/magbotsim/rl_envs/object_manipulation/pushing/state_based_global_pushing_env.py
@@ -162,6 +162,14 @@ def __init__(
self.object_start_yaw = 0.0
self.object_goal_yaw = 0.0
+ # Episode-level Shapely cache (learn_pose=True only).
+ # Both the object shape and the desired-goal pose are fixed for the duration
+ # of an episode, so we build these polygons once in _reset_callback and reuse
+ # them every step instead of rebuilding from scratch each time.
+ self._episode_base_poly: sg.MultiPolygon | None = None
+ self._episode_goal_poly: sg.MultiPolygon | None = None
+ self._episode_desired_goal: np.ndarray | None = None
+
self.object_sliding_friction = object_sliding_friction
self.object_torsional_friction = object_torsional_friction
@@ -670,6 +678,21 @@ def _reset_callback(self, options: dict[str, Any] | None = None) -> None:
# reload model with new start pos and goal pos
self.reload_model(mover_start_xy_pos=start_qpos[:, :2])
+ if self.learn_pose:
+ episode_geoms = self._geoms(name='object')
+ self._episode_base_poly = sg.MultiPolygon(
+ [self._geom_to_shapely(episode_geoms[i]) for i in range(episode_geoms.shape[0]) if episode_geoms[i].any()]
+ )
+ self._episode_desired_goal = np.array(
+ [
+ self.object_xy_goal_pos[0],
+ self.object_xy_goal_pos[1],
+ np.sin(self.object_goal_yaw),
+ np.cos(self.object_goal_yaw),
+ ]
+ )
+ self._episode_goal_poly = self._pose_poly(self._episode_base_poly, self._episode_desired_goal)
+
# reset corrective movement measurement
self.cm_measurement.reset()
@@ -843,12 +866,22 @@ def _body_to_shapely(
:param body_name: the name of the MuJoCo body to convert
:return: a Shapely Polygon representing the union of all geometries in the body
"""
+ base_poly = sg.MultiPolygon([self._geom_to_shapely(geoms[i]) for i in range(geoms.shape[0]) if geoms[i].any()])
+ return self._pose_poly(base_poly, pose)
+
+ def _pose_poly(self, base_poly: sg.MultiPolygon, pose: np.ndarray) -> sg.MultiPolygon:
+ """Apply a pose transformation to a base polygon.
+
+ :param base_poly: MultiPolygon in the body's local frame (as returned by _body_to_shapely)
+ :param pose: a numpy array containing [x, y, sin_yaw, cos_yaw]
+ :return: the base polygon rotated and translated to the given world pose
+ """
x, y, sin_yaw, cos_yaw = pose.squeeze()
yaw = np.arctan2(sin_yaw, cos_yaw)
return shapely.affinity.translate(
shapely.affinity.rotate(
- sg.MultiPolygon([self._geom_to_shapely(geoms[i]) for i in range(geoms.shape[0]) if geoms[i].any()]),
+ base_poly,
yaw,
origin='center',
use_radians=True,
@@ -873,6 +906,31 @@ def _object_coverage(
:param object_geoms: a numpy array containing geometry data for the object
:return: the coverage ratio between 0.0 and 1.0, where 1.0 means perfect overlap
"""
+ # Fast path for single-step calls within the current episode.
+ #
+ # Both the object's base shape (geoms) and the desired-goal polygon are
+ # episode-constant. When this method is called with the episode's own goal
+ # (batch_size == 1 and desired_goal matches the cached value) we can skip all
+ # _geom_to_shapely construction and the full goal-polygon build entirely.
+ #
+ # HER compute_reward calls pass varied desired_goals (and potentially geoms
+ # from past episodes), so they fall through to the general path below.
+ if (
+ self._episode_base_poly is not None
+ and achieved_goal.shape[0] == 1
+ and np.array_equal(desired_goal[0], self._episode_desired_goal)
+ ):
+ object_poly = self._pose_poly(self._episode_base_poly, achieved_goal[0])
+ return np.array(
+ [
+ np.clip(
+ self._episode_goal_poly.intersection(object_poly).area / self._episode_goal_poly.area,
+ 0,
+ 1,
+ )
+ ]
+ )
+
object_polys = [self._body_to_shapely(geoms, pose) for geoms, pose in zip(object_geoms, achieved_goal, strict=True)]
goal_polys = [self._body_to_shapely(geoms, pose) for geoms, pose in zip(object_geoms, desired_goal, strict=True)]
diff --git a/magbotsim/utils/benchmark_utils.py b/magbotsim/utils/benchmark_utils.py
index 95962ef..37a9053 100644
--- a/magbotsim/utils/benchmark_utils.py
+++ b/magbotsim/utils/benchmark_utils.py
@@ -96,9 +96,9 @@ class EnergyEfficiencyMeasurement:
"""A utility class to measure energy efficiency :math:`W` based on weighted sum of jerk, acceleration, and velocity.
.. math::
- W = \sum_{m=0}^{M} w_j\cdot |j(m,t)| + w_a\cdot |a(m,t)| + w_v \cdot |v(m,t)|,
+ W = \\sum_{m=0}^{M} w_j\\cdot |j(m,t)| + w_a\\cdot |a(m,t)| + w_v \\cdot |v(m,t)|,
- where :math:`w_j, w_a, w_v \in\mathbb{R}` are the weights and :math:`M\in\mathbb{N}` is the number of movers for which
+ where :math:`w_j, w_a, w_v \\in\\mathbb{R}` are the weights and :math:`M\\in\\mathbb{N}` is the number of movers for which
to measure the energy efficiency.
This class computes a weighted sum of the absolute values of jerk, acceleration, and velocity for all movers
diff --git a/magbotsim/utils/geometry_2D_utils.py b/magbotsim/utils/geometry_2D_utils.py
index beeffdb..880987b 100644
--- a/magbotsim/utils/geometry_2D_utils.py
+++ b/magbotsim/utils/geometry_2D_utils.py
@@ -3,7 +3,6 @@
##########################################################
import numpy as np
-from scipy.spatial.transform import Rotation as R
def check_line_segments_intersect(p1: np.ndarray, p2: np.ndarray, q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
@@ -82,25 +81,29 @@ def get_2D_rect_vertices(qpos: np.ndarray, size: np.ndarray) -> np.ndarray:
assert qpos.shape == (num_rectangles, 7)
assert size.shape == (num_rectangles, 2)
- quats = qpos[:, -4:].copy()
- r_quats = R.from_quat(quats, scalar_first=True) # quaternions are automatically normalized before initialization
- rot_mats = r_quats.as_matrix()
- assert rot_mats.shape == (num_rectangles, 3, 3)
- # vertices w.r.t. local frame of each rectangle
- vertices_l = np.array(
- [
- [-size[:, 0], -size[:, 0], size[:, 0], size[:, 0]],
- [-size[:, 1], size[:, 1], size[:, 1], -size[:, 1]],
- np.zeros((4, num_rectangles)),
- ]
- )
- vertices_l = np.swapaxes(vertices_l, 0, 2)
- vertices_l = np.swapaxes(vertices_l, 1, 2)
-
- # vertices w.r.t. base frame
- vertices_b = (rot_mats @ vertices_l)[:, :2, :] + np.repeat(qpos[:, :2].reshape((num_rectangles, 2, -1)), 4, axis=2)
-
- return vertices_b
+ # Extract yaw from quaternion (w, qx, qy, qz) at indices 3-6.
+ # yaw = arctan2(2(wz + xy), 1 - 2(y^2 + z^2))
+ w_q = qpos[:, 3]
+ x_q = qpos[:, 4]
+ y_q = qpos[:, 5]
+ z_q = qpos[:, 6]
+ yaw = np.arctan2(2.0 * (w_q * z_q + x_q * y_q), 1.0 - 2.0 * (y_q * y_q + z_q * z_q))
+ cos_t = np.cos(yaw) # (n,)
+ sin_t = np.sin(yaw) # (n,)
+
+ hx = size[:, 0] # (n,)
+ hy = size[:, 1] # (n,)
+
+ # Local-frame coordinates of the four vertices:
+ # v0=(-hx,-hy), v1=(-hx,+hy), v2=(+hx,+hy), v3=(+hx,-hy)
+ lx = np.column_stack([-hx, -hx, hx, hx]) # (n, 4)
+ ly = np.column_stack([-hy, hy, hy, -hy]) # (n, 4)
+
+ # Apply 2D rotation and translate to world frame
+ vx = cos_t[:, None] * lx - sin_t[:, None] * ly + qpos[:, 0:1] # (n, 4)
+ vy = sin_t[:, None] * lx + cos_t[:, None] * ly + qpos[:, 1:2] # (n, 4)
+
+ return np.stack([vx, vy], axis=1) # (n, 2, 4)
def check_rectangles_intersect(
diff --git a/pyproject.toml b/pyproject.toml
index a6fd860..e4c5800 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "magbotsim"
description = "Magentic robotics simulation including reinforcement learning environments based on MuJoCo"
readme = "README.md"
-requires-python = ">= 3.11, <3.14"
+requires-python = ">= 3.11, <3.15"
authors = [{ name = "Lara Bergmann", email = "lara.bergmann@uni-bielefeld.de" }]
keywords = ["Magnetic Robotics", "Reinforcement Learning", "Gymnasium", "PettingZoo", "RL", "AI", "Robotics", "Magnetic Levitation"]
classifiers = [
@@ -15,6 +15,7 @@ classifiers = [
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
+ "Programming Language :: Python :: 3.14",
'Intended Audience :: Science/Research',
'Topic :: Scientific/Engineering :: Artificial Intelligence',
]
diff --git a/tests/test_basic_env.py b/tests/test_basic_env.py
index aa2a676..9fb1b7e 100644
--- a/tests/test_basic_env.py
+++ b/tests/test_basic_env.py
@@ -2,10 +2,11 @@
# Copyright (c) 2024 Lara Bergmann, Bielefeld University #
##########################################################
-import pytest
+import mujoco
import numpy as np
+import pytest
+
from magbotsim import BasicMagBotEnv
-import mujoco
@pytest.mark.parametrize(
@@ -351,6 +352,64 @@ def test_initial_mover_zpos(mover_params, initial_mover_zpos):
assert np.allclose(mover_qpos[:, 2], initial_mover_zpos)
+@pytest.mark.parametrize(
+ 'mover_params, initial_mover_zpos',
+ [
+ # shape box
+ ({'shape': 'box', 'size': np.array([0.113 / 2, 0.113 / 2, 0.012 / 2]), 'mass': 0.63}, 0.001),
+ ({'shape': 'box', 'size': np.array([0.113 / 2, 0.113 / 2, 0.012 / 2]), 'mass': 0.63}, 0.004),
+ # shape cylinder
+ ({'shape': 'cylinder', 'size': np.array([0.113 / 2, 0.012 / 2, 0.01]), 'mass': 0.639}, 0.001),
+ ({'shape': 'cylinder', 'size': np.array([0.113 / 2, 0.012 / 2, 0.01]), 'mass': 0.639}, 0.004),
+ # shape mesh
+ (
+ {
+ 'shape': 'mesh',
+ 'mesh': {'mover_stl_path': 'beckhoff_apm4220_mover', 'bumper_stl_path': 'beckhoff_apm4220_bumper'},
+ 'mass': 0.639 - 0.034,
+ 'bumper_mass': 0.034,
+ },
+ 0.001,
+ ),
+ (
+ {
+ 'shape': 'mesh',
+ 'mesh': {'mover_stl_path': 'beckhoff_apm4220_mover', 'bumper_stl_path': 'beckhoff_apm4220_bumper'},
+ 'mass': 0.639 - 0.034,
+ 'bumper_mass': 0.034,
+ },
+ 0.004,
+ ),
+ ],
+)
+def test_initial_mover_zpos_slow_path(mover_params, initial_mover_zpos):
+ """get_mover_qpos should apply the z adjustment correctly when called with a subset of mover
+ names (slow path), not just when called with the full env.mover_names list (fast path).
+ """
+ collision_params = {
+ 'shape': 'circle',
+ 'size': 0.8,
+ 'offset': 0.0,
+ 'offset_wall': 0.0,
+ }
+ env = BasicMagBotEnv(
+ layout_tiles=np.ones((4, 4)),
+ num_movers=2,
+ mover_params=mover_params,
+ initial_mover_zpos=initial_mover_zpos,
+ collision_params=collision_params,
+ )
+ mujoco.mj_step(env.model, env.data, nstep=1)
+
+ # Pass a list (not the identity env.mover_names) to force the slow path for each mover
+ for mover_name in env.mover_names:
+ mover_qpos = env.get_mover_qpos(mover_names=[mover_name], add_noise=False)
+ assert mover_qpos.shape == (1, 7)
+ assert np.isclose(mover_qpos[0, 2], initial_mover_zpos), (
+ f'z adjustment incorrect for mover {mover_name!r}: expected {initial_mover_zpos}, got {mover_qpos[0, 2]}'
+ )
+
+
@pytest.mark.parametrize(
'mover_params, expected_size',
[